Merge pull request #578 from jbonofre/AMQ-8073
[AMQ-8073] Upgrade to commons-pool2 2.9.0
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 0c7044f..1c17875 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -203,6 +203,7 @@
private final AtomicBoolean preShutdownHooksInvoked = new AtomicBoolean(false);
private BrokerPlugin[] plugins;
private boolean keepDurableSubsActive = true;
+ private boolean enableMessageExpirationOnActiveDurableSubs = false;
private boolean useVirtualTopics = true;
private boolean useMirroredQueues = false;
private boolean useTempMirroredQueues = true;
@@ -1729,6 +1730,14 @@
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
this.keepDurableSubsActive = keepDurableSubsActive;
}
+
+ public boolean isEnableMessageExpirationOnActiveDurableSubs() {
+ return enableMessageExpirationOnActiveDurableSubs;
+ }
+
+ public void setEnableMessageExpirationOnActiveDurableSubs(boolean enableMessageExpirationOnActiveDurableSubs) {
+ this.enableMessageExpirationOnActiveDurableSubs = enableMessageExpirationOnActiveDurableSubs;
+ }
public boolean isUseVirtualTopics() {
return useVirtualTopics;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 6a5c599..e58da62 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -58,6 +58,7 @@
private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private boolean keepDurableSubsActive;
+ private boolean enableMessageExpirationOnActiveDurableSubs;
private final AtomicBoolean active = new AtomicBoolean();
private final AtomicLong offlineTimestamp = new AtomicLong(-1);
private final HashSet<MessageId> ackedAndPrepared = new HashSet<MessageId>();
@@ -69,6 +70,7 @@
this.pending.setSystemUsage(usageManager);
this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.keepDurableSubsActive = keepDurableSubsActive;
+ this.enableMessageExpirationOnActiveDurableSubs = broker.getBrokerService().isEnableMessageExpirationOnActiveDurableSubs();
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
}
@@ -429,4 +431,8 @@
public boolean isKeepDurableSubsActive() {
return keepDurableSubsActive;
}
+
+ public boolean isEnableMessageExpirationOnActiveDurableSubs() {
+ return enableMessageExpirationOnActiveDurableSubs;
+ }
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 7df0138..7210fa8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -686,7 +686,7 @@
final ConnectionContext connectionContext = createConnectionContext();
for (Message message : toExpire) {
for (DurableTopicSubscription sub : durableSubscribers.values()) {
- if (!sub.isActive()) {
+ if (!sub.isActive() || sub.isEnableMessageExpirationOnActiveDurableSubs()) {
message.setRegionDestination(this);
messageExpired(connectionContext, sub, message);
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
index bb62541..fb46768 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
@@ -30,10 +30,11 @@
private Collection forwardTo;
private boolean forwardOnly = true;
private boolean concurrentSend = false;
+ private boolean sendWhenNotMatched = false;
@Override
public Destination intercept(Destination destination) {
- return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isConcurrentSend());
+ return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(),isSendWhenNotMatched(), isConcurrentSend());
}
@Override
@@ -192,4 +193,12 @@
return true;
}
+
+ public boolean isSendWhenNotMatched() {
+ return sendWhenNotMatched;
+ }
+
+ public void setSendWhenNotMatched(boolean sendWhenNotMatched) {
+ this.sendWhenNotMatched = sendWhenNotMatched;
+ }
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
index 119257d..5bd5716 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
@@ -41,12 +41,14 @@
private Collection forwardDestinations;
private boolean forwardOnly;
private boolean concurrentSend = false;
+ private boolean sendWhenNotMatched=false;
- public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean concurrentSend) {
+ public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly,boolean sendWhenNotMatched, boolean concurrentSend) {
super(next);
this.forwardDestinations = forwardDestinations;
this.forwardOnly = forwardOnly;
this.concurrentSend = concurrentSend;
+ this.sendWhenNotMatched = sendWhenNotMatched;
}
@Override
@@ -100,9 +102,17 @@
doForward(context, message, brokerService.getRegionBroker(), destination);
}
}
- if (!forwardOnly) {
- super.send(context, message);
+ if(sendWhenNotMatched)
+ {
+ if(matchingDestinations.size() <=0) {
+ super.send(context, message);
+ }
+ }else {
+ if (!forwardOnly) {
+ super.send(context, message);
+ }
}
+
concurrent.await();
if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
index 8624a06..01f6fcc 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
@@ -345,20 +345,22 @@
private NetworkInterface findNetworkInterface() throws SocketException {
Enumeration<NetworkInterface> ifcs = NetworkInterface.getNetworkInterfaces();
List<NetworkInterface> possibles = new ArrayList<NetworkInterface>();
- while (ifcs.hasMoreElements()) {
- NetworkInterface ni = ifcs.nextElement();
- try {
- if (ni.supportsMulticast()
- && ni.isUp()) {
- for (InterfaceAddress ia : ni.getInterfaceAddresses()) {
- if (ia != null && ia.getAddress() instanceof java.net.Inet4Address
- && !ia.getAddress().isLoopbackAddress()
- && (ni.getDisplayName()==null || !ni.getDisplayName().startsWith("vnic"))) {
- possibles.add(ni);
+ if (ifcs != null) {
+ while (ifcs.hasMoreElements()) {
+ NetworkInterface ni = ifcs.nextElement();
+ try {
+ if (ni.supportsMulticast()
+ && ni.isUp()) {
+ for (InterfaceAddress ia : ni.getInterfaceAddresses()) {
+ if (ia != null && ia.getAddress() instanceof java.net.Inet4Address
+ && !ia.getAddress().isLoopbackAddress()
+ && (ni.getDisplayName()==null || !ni.getDisplayName().startsWith("vnic"))) {
+ possibles.add(ni);
+ }
}
}
- }
- } catch (SocketException ignored) {}
+ } catch (SocketException ignored) {}
+ }
}
return possibles.isEmpty() ? null : possibles.get(possibles.size() - 1);
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
index d6a50a1..0a717f4 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
@@ -98,18 +98,21 @@
}
private void checkSecurity(Class clazz) throws ClassNotFoundException {
- if (!clazz.isPrimitive()) {
- if (clazz.getPackage() != null && !trustAllPackages()) {
- boolean found = false;
- for (String packageName : getTrustedPackages()) {
- if (clazz.getPackage().getName().equals(packageName) || clazz.getPackage().getName().startsWith(packageName + ".")) {
- found = true;
- break;
- }
- }
- if (!found) {
- throw new ClassNotFoundException("Forbidden " + clazz + "! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.");
- }
+ if (trustAllPackages() || clazz.isPrimitive()) {
+ return;
+ }
+
+ boolean found = false;
+ Package thePackage = clazz.getPackage();
+ if (thePackage != null) {
+ for (String trustedPackage : getTrustedPackages()) {
+ if (thePackage.getName().equals(trustedPackage) || thePackage.getName().startsWith(trustedPackage + ".")) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw new ClassNotFoundException("Forbidden " + clazz + "! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.");
}
}
}
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
index a7285da..b7766a2 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
@@ -161,21 +161,6 @@
return true;
}
- protected String readRequestBody(HttpServletRequest request) throws IOException {
- StringBuffer buffer = new StringBuffer();
- BufferedReader reader = request.getReader();
- while (true) {
- String line = reader.readLine();
- if (line == null) {
- break;
- } else {
- buffer.append(line);
- buffer.append("\n");
- }
- }
- return buffer.toString();
- }
-
protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
String clientID = request.getHeader("clientID");
if (clientID == null) {
diff --git a/activemq-karaf/src/main/resources/features-core.xml b/activemq-karaf/src/main/resources/features-core.xml
index f04f522..79a1ec5 100644
--- a/activemq-karaf/src/main/resources/features-core.xml
+++ b/activemq-karaf/src/main/resources/features-core.xml
@@ -52,6 +52,7 @@
<feature>http</feature>
<feature version="${project.version}">activemq-client</feature>
<bundle>mvn:org.apache.activemq/activemq-karaf/${project.version}</bundle>
+ <bundle dependency="true">mvn:commons-io/commons-io/${commons-io-version}</bundle>
<bundle dependency="true">mvn:commons-collections/commons-collections/${commons-collections-version}</bundle>
<bundle dependency='true'>mvn:commons-lang/commons-lang/${commons-lang-version}</bundle>
<bundle dependency="true">mvn:commons-codec/commons-codec/1.9</bundle>
diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml
index 339d9b8..cee87ba 100644
--- a/activemq-osgi/pom.xml
+++ b/activemq-osgi/pom.xml
@@ -74,6 +74,7 @@
javax.management*,
javax.transaction*;version="[1,3)",
javax.naming*;resolution:=optional,
+ org.apache.commons.io*;resolution:=optional,
org.apache.commons.pool*;resolution:=optional,
org.apache.commons.net*;resolution:=optional,
com.sun*;resolution:=optional,
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
new file mode 100644
index 0000000..ef71d1d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
@@ -0,0 +1,517 @@
+package org.apache.activemq.broker.virtual;
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.CompositeQueue;
+import org.apache.activemq.broker.region.virtual.FilteredDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.spring.ConsumerBean;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositeDestinationSendWhenNotMatchedTest extends EmbeddedBrokerTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class);
+
+ protected int total = 10;
+ protected Connection connection;
+
+
+ @Test
+ public void testSendWhenNotMatched() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+ // messageList1.waitForMessagesToArrive(0);
+ // messageList2.waitForMessagesToArrive(1);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.B");
+ Destination destination1 =new ActiveMQQueue("A.B");
+ Destination destination2 = new ActiveMQQueue("A.C");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tet13"));
+
+ messageList1.assertMessagesArrived(1);
+ messageList2.assertMessagesArrived(0);
+ }
+
+ @Test
+ public void testSendWhenMatched() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+ // messageList1.waitForMessagesToArrive(0);
+ // messageList2.waitForMessagesToArrive(1);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.B");
+ Destination destination1 =new ActiveMQQueue("A.B");
+ Destination destination2 = new ActiveMQQueue("A.C");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "test13"));
+
+ messageList2.assertMessagesArrived(1);
+ messageList1.assertMessagesArrived(0);
+
+ }
+ @Test
+ public void testForwardOnlyFalseSendWhenMatchedTrue1() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.D");
+ Destination destination1 =new ActiveMQQueue("A.D");
+ Destination destination2 = new ActiveMQQueue("A.E");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tes13"));
+
+ messageList1.assertMessagesArrived(1);
+ messageList2.assertMessagesArrived(0);
+
+ }
+
+ public void testForwardOnlyFalseSendWhenMatchedTrue2() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.D");
+ Destination destination1 =new ActiveMQQueue("A.D");
+ Destination destination2 = new ActiveMQQueue("A.E");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "test13"));
+ Thread.sleep(1*1000);
+ messageList2.assertMessagesArrived(1);
+ messageList1.assertMessagesArrived(0);
+
+ }
+ @Test
+ public void testForwardOnlyFalseBackwardCompatability1() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.X");
+ Destination destination1 =new ActiveMQQueue("A.X");
+ Destination destination2 = new ActiveMQQueue("A.Y");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "test13"));
+
+ messageList2.assertMessagesArrived(1);
+ messageList1.assertMessagesArrived(1);
+
+ }
+ @Test
+ public void testForwardOnlyFalseBackwardCompatability2() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.X");
+ Destination destination1 =new ActiveMQQueue("A.X");
+ Destination destination2 = new ActiveMQQueue("A.Y");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tet13"));
+
+
+ messageList1.assertMessagesArrived(1);
+ messageList2.assertMessagesArrived(0);
+
+ }
+
+ @Test
+ public void testForwardOnlyTrueBackwardCompatability1() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.W");
+ Destination destination1 =new ActiveMQQueue("A.W");
+ Destination destination2 = new ActiveMQQueue("A.V");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "test13"));
+
+ messageList2.assertMessagesArrived(1);
+ messageList1.assertMessagesArrived(0);
+
+ }
+ @Test
+ public void testForwardOnlyTrueBackwardCompatability2() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.W");
+ Destination destination1 =new ActiveMQQueue("A.W");
+ Destination destination2 = new ActiveMQQueue("A.V");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tet13"));
+ Thread.sleep(2*1000);
+ messageList1.assertMessagesArrived(0);
+ messageList2.assertMessagesArrived(0);
+
+ }
+
+ @Test
+ public void testForwardOnlySendWhenNotMatchedSetToFalse() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+ // messageList1.waitForMessagesToArrive(0);
+ // messageList2.waitForMessagesToArrive(1);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("X.Y");
+ Destination destination1 = new ActiveMQQueue("X.Y");
+ Destination destination2 = new ActiveMQQueue("X.Z");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tet13"));
+ messageList2.assertMessagesArrived(1);
+ messageList1.assertMessagesArrived(0);
+
+ }
+ @Test
+ public void testForwardOnlyFalseSendWhenNotMatchedSetToFalse() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+ // messageList1.waitForMessagesToArrive(0);
+ // messageList2.waitForMessagesToArrive(1);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("R.S");
+ Destination destination1 = new ActiveMQQueue("R.S");
+ Destination destination2 = new ActiveMQQueue("R.T");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tet13"));
+ messageList1.assertMessagesArrived(1);
+ messageList2.assertMessagesArrived(1);
+ }
+
+ protected Destination getConsumer1Dsetination() {
+ return new ActiveMQQueue("A.B");
+ }
+
+ protected Destination getConsumer2Dsetination() {
+ return new ActiveMQQueue("A.C");
+ }
+
+ protected Destination getProducerDestination() {
+ return new ActiveMQQueue("A.B");
+ }
+
+ protected TextMessage createMessage(Session session, String testid) throws JMSException {
+ TextMessage textMessage = session.createTextMessage("testMessage");
+ textMessage.setStringProperty("testid", testid);
+ return textMessage;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ answer.setPersistent(isPersistent());
+ answer.getManagementContext().setCreateConnector(false);
+ answer.addConnector(bindAddress);
+ /*
+ * <destinationInterceptors> <virtualDestinationInterceptor>
+ * <virtualDestinations> <compositeQueue name="A.B" forwardOnly="false">
+ * <forwardTo> <filteredDestination selector="testid LIKE 'test%'" queue="A.C"/>
+ * </forwardTo> </compositeQueue> </virtualDestinations>
+ * </virtualDestinationInterceptor> </destinationInterceptors>
+ */
+ /*
+ * SendWhenNotMatched = true A message will be always forwarded to if not matched to filtered destination
+ * ForwardOnly setting has no impact
+ */
+
+ CompositeQueue compositeQueue = new CompositeQueue();
+ compositeQueue.setName("A.B");
+ compositeQueue.setForwardOnly(true); // By default it is true
+ compositeQueue.setSendWhenNotMatched(true);// By default it is false
+ FilteredDestination filteredQueue = new FilteredDestination();
+ filteredQueue.setQueue("A.C");
+ filteredQueue.setSelector("testid LIKE 'test%'");
+ final ArrayList<Object> forwardDestinations = new ArrayList<Object>();
+ forwardDestinations.add(filteredQueue);
+ compositeQueue.setForwardTo(forwardDestinations);
+
+
+ CompositeQueue compositeQueue0 = new CompositeQueue();
+ compositeQueue0.setName("A.D");
+ compositeQueue0.setForwardOnly(false); // By default it is true
+ compositeQueue0.setSendWhenNotMatched(true);// By default it is false
+ FilteredDestination filteredQueue0 = new FilteredDestination();
+ filteredQueue0.setQueue("A.E");
+ filteredQueue0.setSelector("testid LIKE 'test%'");
+ final ArrayList<Object> forwardDestinations0 = new ArrayList<Object>();
+ forwardDestinations0.add(filteredQueue0);
+ compositeQueue0.setForwardTo(forwardDestinations0);
+
+ //Back compatibility test 1
+ CompositeQueue compositeQueue01 = new CompositeQueue();
+ compositeQueue01.setName("A.X");
+ compositeQueue01.setForwardOnly(false); // By default it is true
+ //compositeQueue01.setSendWhenNotMatched(false);// By default it is false
+ FilteredDestination filteredQueue01 = new FilteredDestination();
+ filteredQueue01.setQueue("A.Y");
+ filteredQueue01.setSelector("testid LIKE 'test%'");
+ final ArrayList<Object> forwardDestinations01 = new ArrayList<Object>();
+ forwardDestinations01.add(filteredQueue01);
+ compositeQueue01.setForwardTo(forwardDestinations01);
+
+ //Back compatibility test 2
+ CompositeQueue compositeQueue02 = new CompositeQueue();
+ compositeQueue02.setName("A.W");
+ //compositeQueue02.setForwardOnly(true); // By default it is true
+ //compositeQueue01.setSendWhenNotMatched(false);// By default it is false
+ FilteredDestination filteredQueue02 = new FilteredDestination();
+ filteredQueue02.setQueue("A.V");
+ filteredQueue02.setSelector("testid LIKE 'test%'");
+ final ArrayList<Object> forwardDestinations02 = new ArrayList<Object>();
+ forwardDestinations02.add(filteredQueue02);
+ compositeQueue02.setForwardTo(forwardDestinations02);
+
+ CompositeQueue compositeQueue1 = new CompositeQueue();
+ compositeQueue1.setName("X.Y");
+ compositeQueue1.setForwardOnly(true); // By default it is true
+ ActiveMQQueue forwardQueue1 =new ActiveMQQueue("X.Z");
+ final ArrayList<Object> forwardDestinations1 = new ArrayList<Object>();
+ forwardDestinations1.add(forwardQueue1);
+ compositeQueue1.setForwardTo(forwardDestinations1);
+
+
+ CompositeQueue compositeQueue2 = new CompositeQueue();
+ compositeQueue2.setName("R.S");
+ compositeQueue2.setForwardOnly(false);
+ ActiveMQQueue forwardQueue2 =new ActiveMQQueue("R.T");
+ final ArrayList<Object> forwardDestinations2 = new ArrayList<Object>();
+ forwardDestinations2.add(forwardQueue2);
+ compositeQueue2.setForwardTo(forwardDestinations2);
+
+ VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+ interceptor.setVirtualDestinations(new VirtualDestination[] { compositeQueue,compositeQueue0,compositeQueue01,compositeQueue02,compositeQueue1,compositeQueue2 });
+ answer.setDestinationInterceptors(new DestinationInterceptor[] { interceptor });
+ return answer;
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java
new file mode 100644
index 0000000..69c734e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.MessageId;
+import org.junit.Test;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class ActiveDurableSubscriptionBrowseExpireTest extends DurableSubscriptionOfflineTestBase {
+ //private static final Logger LOG = LoggerFactory.getLogger(ActiveDurableSubscriptionBrowseExpireTest.class);
+ private boolean enableExpiration = true;
+
+ public ActiveDurableSubscriptionBrowseExpireTest(boolean enableExpiration) {
+ keepDurableSubsActive = true;
+ this.enableExpiration = enableExpiration;
+ }
+
+ @Parameterized.Parameters(name = "enableExpiration_{0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ { false },
+ { true }
+ });
+ }
+
+ @Override
+ public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
+ return super.setPersistenceAdapter(broker, PersistenceAdapterChoice.MEM);
+ }
+
+ @Override
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
+ connectionFactory.setWatchTopicAdvisories(false);
+ return connectionFactory;
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testBrowseExpireActiveSub() throws Exception {
+ final int numberOfMessages = 10;
+
+ broker.setEnableMessageExpirationOnActiveDurableSubs(enableExpiration);
+
+ // create durable subscription
+ Connection con = createConnection("consumer");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId");
+
+ long timeStamp = System.currentTimeMillis();
+ sendMessages(numberOfMessages, timeStamp);
+
+ ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
+ assertEquals(1, subs.length);
+
+ ObjectName subName = subs[0];
+ DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
+ broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
+ assertEquals(true, sub.isActive());
+
+ // browse the durable sub
+ CompositeData[] data = sub.browse();
+ assertNotNull(data);
+ assertEquals(numberOfMessages, data.length);
+
+ Destination dest = broker.getDestination(topic);
+ assertEquals(0, dest.getDestinationStatistics().getExpired().getCount());
+
+ // add every 3rd message to the expiration list
+ TopicMessageStore topicStore = (TopicMessageStore)dest.getMessageStore();
+ LinkedList<org.apache.activemq.command.Message> messagesToExpire = new LinkedList<>();
+ topicStore.recover(new MessageRecoveryListener() {
+ @Override
+ public boolean recoverMessage(org.apache.activemq.command.Message message) throws Exception {
+ int index = (int)message.getProperty("index");
+ if(index % 3 == 0)
+ messagesToExpire.add(message);
+ return true;
+ }
+
+ @Override
+ public boolean recoverMessageReference(MessageId messageReference) throws Exception {
+ return true;
+ }
+
+ @Override
+ public boolean hasSpace() {
+
+ return true;
+ }
+
+ @Override
+ public boolean isDuplicate(MessageId id) {
+ return false;
+ }
+ });
+
+ // expire messages in the topic store
+ for(org.apache.activemq.command.Message message: messagesToExpire) {
+ message.setExpiration(timeStamp - 1);
+ topicStore.updateMessage(message);
+ }
+
+ // browse (should | should not) expire the messages on the destination if expiration is (enabled | not enabled)
+ data = sub.browse();
+ assertNotNull(data);
+ assertEquals(enableExpiration ? messagesToExpire.size() : 0, dest.getDestinationStatistics().getExpired().getCount());
+
+ session.close();
+ con.close();
+ }
+
+ private void sendMessages(int numberOfMessages, long timeStamp) throws Exception {
+ Connection con = createConnection("producer");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ Message message = session.createMessage();
+ message.setIntProperty("index", i);
+ message.setJMSTimestamp(timeStamp);
+ producer.send(topic, message);
+ }
+
+ session.close();
+ con.close();
+ }
+}
diff --git a/activemq-web/pom.xml b/activemq-web/pom.xml
index 70fa0be..01cacab 100644
--- a/activemq-web/pom.xml
+++ b/activemq-web/pom.xml
@@ -55,6 +55,10 @@
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-unit-tests</artifactId>
<scope>test</scope>
diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
index 02e2b7a..5a2771b 100644
--- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
+++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
@@ -19,6 +19,7 @@
import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -34,6 +35,7 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.io.input.BoundedInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +60,12 @@
public abstract class MessageServletSupport extends HttpServlet {
private static final transient Logger LOG = LoggerFactory.getLogger(MessageServletSupport.class);
+ /**
+ * A configuration tag to specify the maximum message size (in bytes) for the servlet. The default
+ * is given by DEFAULT_MAX_MESSAGE_SIZE below.
+ */
+ private static final String MAX_MESSAGE_SIZE_TAG = "maxMessageSize";
+ private static final Long DEFAULT_MAX_MESSAGE_SIZE = 100000L;
private boolean defaultTopicFlag = true;
private Destination defaultDestination;
@@ -68,6 +76,7 @@
private int defaultMessagePriority = 5;
private long defaultMessageTimeToLive;
private String destinationOptions;
+ private long maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
public void init(ServletConfig servletConfig) throws ServletException {
super.init(servletConfig);
@@ -91,6 +100,11 @@
}
}
+ String maxMessageSizeConfigured = servletConfig.getInitParameter(MAX_MESSAGE_SIZE_TAG);
+ if (maxMessageSizeConfigured != null) {
+ maxMessageSize = Long.parseLong(maxMessageSizeConfigured);
+ }
+
// lets check to see if there's a connection factory set
WebClient.initContext(getServletContext());
}
@@ -344,7 +358,8 @@
if (answer == null && contentType != null) {
LOG.debug("Content-Type={}", contentType);
// lets read the message body instead
- BufferedReader reader = request.getReader();
+ BoundedInputStream boundedInputStream = new BoundedInputStream(request.getInputStream(), maxMessageSize);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(boundedInputStream));
StringBuilder buffer = new StringBuilder();
while (true) {
String line = reader.readLine();
diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml
index b1067c0..17ac1f9 100644
--- a/assembly/src/main/descriptors/common-bin.xml
+++ b/assembly/src/main/descriptors/common-bin.xml
@@ -182,6 +182,7 @@
<include>${pom.groupId}:activeio-core</include>
<include>commons-beanutils:commons-beanutils</include>
<include>commons-collections:commons-collections</include>
+ <include>commons-io:commons-io</include>
<include>org.apache.commons:commons-dbcp2</include>
<include>org.apache.commons:commons-pool2</include>
<include>commons-codec:commons-codec</include>
diff --git a/assembly/src/release/bin/env b/assembly/src/release/bin/env
index 947807b..a31d687 100644
--- a/assembly/src/release/bin/env
+++ b/assembly/src/release/bin/env
@@ -1,4 +1,3 @@
-#!/bin/sh
# ------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
diff --git a/pom.xml b/pom.xml
index 9acd48f..db5d16d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,7 +52,7 @@
<commons-beanutils-version>1.9.4</commons-beanutils-version>
<commons-collections-version>3.2.2</commons-collections-version>
<commons-daemon-version>1.2.3</commons-daemon-version>
- <commons-dbcp2-version>2.7.0</commons-dbcp2-version>
+ <commons-dbcp2-version>2.8.0</commons-dbcp2-version>
<commons-io-version>2.8.0</commons-io-version>
<commons-lang-version>2.6</commons-lang-version>
<commons-logging-version>1.2</commons-logging-version>