Merge pull request #422 from fpapon/AMQ-7351
[AMQ-7351] Update to Apache pom parent 21
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 1508c61..f2d41e9 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -229,8 +229,10 @@
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.addProducer(context, info);
- // Don't advise advisory topics.
- if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
+ //Verify destination is either non-null or that we want to advise anonymous producers on null destination
+ //Don't advise advisory topics.
+ if ((info.getDestination() != null || getBrokerService().isAnonymousProducerAdvisorySupport())
+ && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
fireProducerAdvisory(context, info.getDestination(), topic, info);
producers.put(info.getProducerId(), info);
@@ -412,12 +414,13 @@
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.removeProducer(context, info);
- // Don't advise advisory topics.
+ //Verify destination is either non-null or that we want to advise anonymous producers on null destination
+ //Don't advise advisory topics.
ActiveMQDestination dest = info.getDestination();
- if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
+ if ((dest != null || getBrokerService().isAnonymousProducerAdvisorySupport()) && !AdvisorySupport.isAdvisoryTopic(dest)) {
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
producers.remove(info.getProducerId());
- if (!dest.isTemporary() || destinations.containsKey(dest)) {
+ if (dest == null || !dest.isTemporary() || destinations.containsKey(dest)) {
fireProducerAdvisory(context, dest, topic, info.createRemoveCommand());
}
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 0b05d31..df27da1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -192,6 +192,7 @@
// to other jms messaging systems
private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true;
+ private boolean anonymousProducerAdvisorySupport = false;
private URI vmConnectorURI;
private String defaultSocketURIString;
private PolicyMap destinationPolicy;
@@ -1522,6 +1523,14 @@
this.advisorySupport = advisorySupport;
}
+ public boolean isAnonymousProducerAdvisorySupport() {
+ return anonymousProducerAdvisorySupport;
+ }
+
+ public void setAnonymousProducerAdvisorySupport(boolean anonymousProducerAdvisorySupport) {
+ this.anonymousProducerAdvisorySupport = anonymousProducerAdvisorySupport;
+ }
+
public List<TransportConnector> getTransportConnectors() {
return new ArrayList<>(transportConnectors);
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
index 402cad4..c030acd 100644
--- a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
@@ -36,6 +36,7 @@
public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
+ public static final String ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Anonymous";
public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
public static final String VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "VirtualDestination.Consumer.";
public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
@@ -137,7 +138,9 @@
public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
String prefix;
- if (destination.isQueue()) {
+ if (destination == null) {
+ prefix = ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX;
+ } else if (destination.isQueue()) {
prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX;
} else {
prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX;
@@ -146,7 +149,8 @@
}
private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination destination, String prefix, boolean consumerTopics) {
- return new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "‚"));
+ return destination != null ? new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "‚")):
+ new ActiveMQTopic(prefix);
}
public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException {
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
index 7ffb3e8..98ba8c9 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
@@ -253,6 +253,30 @@
connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
Message msg = connectionSub.receive(1, TimeUnit.SECONDS);
assertNull("Shouldn't receive the message", msg);
+
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testWildcardRetainedSubscriptionLocked() throws Exception {
+ MQTT mqttPub = createMQTTConnection("pub", true);
+ mqttPub.setUserName("admin");
+ mqttPub.setPassword("admin");
+
+ getProxyToBroker().addTopic("one.foo");
+ BlockingConnection connectionPub = mqttPub.blockingConnection();
+ connectionPub.connect();
+ connectionPub.publish("one/foo", "test".getBytes(), QoS.AT_LEAST_ONCE, true);
+
+ MQTT mqttSub = createMQTTConnection("sub", true);
+ mqttSub.setUserName("user");
+ mqttSub.setPassword("password");
+ BlockingConnection connectionSub = mqttSub.blockingConnection();
+ connectionSub.connect();
+ connectionSub.subscribe(new Topic[]{new Topic("+/#", QoS.AT_LEAST_ONCE)});
+ Message msg = connectionSub.receive(1, TimeUnit.SECONDS);
+ assertNull("Shouldn't receive the message", msg);
+
+ assertEquals(1, getProxyToTopic("one.foo").getEnqueueCount());
}
@Test(timeout = 60 * 1000)
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
index c65dc53..438d649 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
@@ -175,7 +175,6 @@
assertNoMessagesLeft(connection2);
}
-
public void testProducerAdvisories() throws Exception {
ActiveMQDestination queue = new ActiveMQQueue("test");
@@ -319,6 +318,105 @@
assertNoMessagesLeft(connection1);
}
+ public void testAnonymousProducerAdvisoriesTrue() throws Exception {
+ //turn on support for anonymous producers
+ broker.setAnonymousProducerAdvisorySupport(true);
+
+ ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(null);
+ assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, destination.getPhysicalName());
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(100);
+
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(consumerInfo1);
+
+ assertNoMessagesLeft(connection1);
+
+ // Setup a producer.
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ //don't set a destination
+
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(producerInfo2);
+
+ // We should get an advisory of the new produver.
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ assertNotNull(m1.getDataStructure());
+ assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId());
+ assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, m1.getDestination().getPhysicalName());
+
+ // Close the second connection.
+ connection2.request(closeConnectionInfo(connectionInfo2));
+ connection2.stop();
+
+ // We should get an advisory of the producer closing
+ m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ assertNotNull(m1.getDataStructure());
+ RemoveInfo r = (RemoveInfo) m1.getDataStructure();
+ assertEquals(r.getObjectId(), producerInfo2.getProducerId());
+ assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, m1.getDestination().getPhysicalName());
+
+ assertNoMessagesLeft(connection2);
+ }
+
+ public void testAnonymousProducerAdvisoriesFalse() throws Exception {
+ broker.setAnonymousProducerAdvisorySupport(false);
+
+ assertAnonymousProducerAdvisoriesOff();
+ }
+
+ public void testAnonymousProducerAdvisoriesDefault() throws Exception {
+ //Default for now is to have anonymous producer advisories turned off
+ assertAnonymousProducerAdvisoriesOff();
+ }
+
+ private void assertAnonymousProducerAdvisoriesOff() throws Exception {
+ ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(null);
+ assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, destination.getPhysicalName());
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(100);
+
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(consumerInfo1);
+
+ assertNoMessagesLeft(connection1);
+
+ // Setup a producer.
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ //don't set a destination
+
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(producerInfo2);
+
+ // We should get an advisory of the new produver.
+ Message m1 = receiveMessage(connection1, 1000);
+ assertNull(m1);
+
+ assertNoMessagesLeft(connection2);
+ }
+
public static Test suite() {
return suite(AdvisoryBrokerTest.class);
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimitTest.java
similarity index 98%
rename from activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java
rename to activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimitTest.java
index 459bec1..a42018e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimitTest.java
@@ -35,7 +35,7 @@
import static org.junit.Assert.*;
// https://issues.apache.org/jira/browse/AMQ-7302
-public class JmxOpPageInOnMemoryLimit {
+public class JmxOpPageInOnMemoryLimitTest {
BrokerService broker;
protected MBeanServer mbeanServer;