Merge pull request #422 from fpapon/AMQ-7351

[AMQ-7351] Update to Apache pom parent 21
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 1508c61..f2d41e9 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -229,8 +229,10 @@
     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         super.addProducer(context, info);
 
-        // Don't advise advisory topics.
-        if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
+        //Verify destination is either non-null or that we want to advise anonymous producers on null destination
+        //Don't advise advisory topics.
+        if ((info.getDestination() != null || getBrokerService().isAnonymousProducerAdvisorySupport())
+                && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
             fireProducerAdvisory(context, info.getDestination(), topic, info);
             producers.put(info.getProducerId(), info);
@@ -412,12 +414,13 @@
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         super.removeProducer(context, info);
 
-        // Don't advise advisory topics.
+        //Verify destination is either non-null or that we want to advise anonymous producers on null destination
+        //Don't advise advisory topics.
         ActiveMQDestination dest = info.getDestination();
-        if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
+        if ((dest != null || getBrokerService().isAnonymousProducerAdvisorySupport()) && !AdvisorySupport.isAdvisoryTopic(dest)) {
             ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
             producers.remove(info.getProducerId());
-            if (!dest.isTemporary() || destinations.containsKey(dest)) {
+            if (dest == null || !dest.isTemporary() || destinations.containsKey(dest)) {
                 fireProducerAdvisory(context, dest, topic, info.createRemoveCommand());
             }
         }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 0b05d31..df27da1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -192,6 +192,7 @@
     // to other jms messaging systems
     private boolean deleteAllMessagesOnStartup;
     private boolean advisorySupport = true;
+    private boolean anonymousProducerAdvisorySupport = false;
     private URI vmConnectorURI;
     private String defaultSocketURIString;
     private PolicyMap destinationPolicy;
@@ -1522,6 +1523,14 @@
         this.advisorySupport = advisorySupport;
     }
 
+    public boolean isAnonymousProducerAdvisorySupport() {
+        return anonymousProducerAdvisorySupport;
+    }
+
+    public void setAnonymousProducerAdvisorySupport(boolean anonymousProducerAdvisorySupport) {
+        this.anonymousProducerAdvisorySupport = anonymousProducerAdvisorySupport;
+    }
+
     public List<TransportConnector> getTransportConnectors() {
         return new ArrayList<>(transportConnectors);
     }
diff --git a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
index 402cad4..c030acd 100644
--- a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
@@ -36,6 +36,7 @@
     public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
     public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
     public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
+    public static final String ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Anonymous";
     public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
     public static final String VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "VirtualDestination.Consumer.";
     public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
@@ -137,7 +138,9 @@
 
     public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
         String prefix;
-        if (destination.isQueue()) {
+        if (destination == null) {
+            prefix = ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX;
+        } else if (destination.isQueue()) {
             prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX;
         } else {
             prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX;
@@ -146,7 +149,8 @@
     }
 
     private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination destination, String prefix, boolean consumerTopics) {
-        return new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "&sbquo;"));
+        return destination != null ? new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "&sbquo;")):
+            new ActiveMQTopic(prefix);
     }
 
     public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException {
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
index 7ffb3e8..98ba8c9 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
@@ -253,6 +253,30 @@
         connectionSub.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
         Message msg = connectionSub.receive(1, TimeUnit.SECONDS);
         assertNull("Shouldn't receive the message", msg);
+
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testWildcardRetainedSubscriptionLocked() throws Exception {
+        MQTT mqttPub = createMQTTConnection("pub", true);
+        mqttPub.setUserName("admin");
+        mqttPub.setPassword("admin");
+
+        getProxyToBroker().addTopic("one.foo");
+        BlockingConnection connectionPub = mqttPub.blockingConnection();
+        connectionPub.connect();
+        connectionPub.publish("one/foo", "test".getBytes(), QoS.AT_LEAST_ONCE, true);
+
+        MQTT mqttSub = createMQTTConnection("sub", true);
+        mqttSub.setUserName("user");
+        mqttSub.setPassword("password");
+        BlockingConnection connectionSub = mqttSub.blockingConnection();
+        connectionSub.connect();
+        connectionSub.subscribe(new Topic[]{new Topic("+/#", QoS.AT_LEAST_ONCE)});
+        Message msg = connectionSub.receive(1, TimeUnit.SECONDS);
+        assertNull("Shouldn't receive the message", msg);
+
+        assertEquals(1, getProxyToTopic("one.foo").getEnqueueCount());
     }
 
     @Test(timeout = 60 * 1000)
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
index c65dc53..438d649 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
@@ -175,7 +175,6 @@
         assertNoMessagesLeft(connection2);
     }
 
-
     public void testProducerAdvisories() throws Exception {
 
         ActiveMQDestination queue = new ActiveMQQueue("test");
@@ -319,6 +318,105 @@
         assertNoMessagesLeft(connection1);
     }
 
+    public void testAnonymousProducerAdvisoriesTrue() throws Exception {
+        //turn on support for anonymous producers
+        broker.setAnonymousProducerAdvisorySupport(true);
+
+        ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(null);
+        assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, destination.getPhysicalName());
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(100);
+
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(consumerInfo1);
+
+        assertNoMessagesLeft(connection1);
+
+        // Setup a producer.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        //don't set a destination
+
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.send(producerInfo2);
+
+        // We should get an advisory of the new produver.
+        Message m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId());
+        assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, m1.getDestination().getPhysicalName());
+
+        // Close the second connection.
+        connection2.request(closeConnectionInfo(connectionInfo2));
+        connection2.stop();
+
+        // We should get an advisory of the producer closing
+        m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        RemoveInfo r = (RemoveInfo) m1.getDataStructure();
+        assertEquals(r.getObjectId(), producerInfo2.getProducerId());
+        assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, m1.getDestination().getPhysicalName());
+
+        assertNoMessagesLeft(connection2);
+    }
+
+    public void testAnonymousProducerAdvisoriesFalse() throws Exception {
+        broker.setAnonymousProducerAdvisorySupport(false);
+
+        assertAnonymousProducerAdvisoriesOff();
+    }
+
+    public void testAnonymousProducerAdvisoriesDefault() throws Exception {
+        //Default for now is to have anonymous producer advisories turned off
+        assertAnonymousProducerAdvisoriesOff();
+    }
+
+    private void assertAnonymousProducerAdvisoriesOff() throws Exception {
+        ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(null);
+        assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, destination.getPhysicalName());
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(100);
+
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(consumerInfo1);
+
+        assertNoMessagesLeft(connection1);
+
+        // Setup a producer.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        //don't set a destination
+
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.send(producerInfo2);
+
+        // We should get an advisory of the new produver.
+        Message m1 = receiveMessage(connection1, 1000);
+        assertNull(m1);
+
+        assertNoMessagesLeft(connection2);
+    }
+
     public static Test suite() {
         return suite(AdvisoryBrokerTest.class);
     }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimitTest.java
similarity index 98%
rename from activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java
rename to activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimitTest.java
index 459bec1..a42018e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimitTest.java
@@ -35,7 +35,7 @@
 import static org.junit.Assert.*;
 
 // https://issues.apache.org/jira/browse/AMQ-7302
-public class JmxOpPageInOnMemoryLimit {
+public class JmxOpPageInOnMemoryLimitTest {
 
     BrokerService broker;
     protected MBeanServer mbeanServer;