AMQ-8349 - Ensure virtual destination consumer advisories are only
replayed to new advisory consumers and not existing
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index f2d41e9..5b722d8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -209,7 +209,7 @@
                 for (Iterator<ConsumerInfo> iter = virtualDestinationConsumers.keySet().iterator(); iter.hasNext(); ) {
                     ConsumerInfo key = iter.next();
                     ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(key.getDestination());
-                    fireConsumerAdvisory(context, key.getDestination(), topic, key);
+                    fireConsumerAdvisory(context, key.getDestination(), topic, key, info.getConsumerId());
               }
             }
 
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index f305b24..a0f4bdb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -298,6 +298,30 @@
         assertAdvisoryBrokerCounts(1,1,1);
     }
 
+    // AMQ-8349 -Verify that replayed advisory messages for virtual consumers are only sent to
+    // new advisory consumers and not to existing by mistake
+    @Test(timeout = 30 * 1000)
+    public void testAdvisoryReplayMultipleAdvisoryConsumers() throws Exception {
+        Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
+        ActiveMQQueue activemq = new ActiveMQQueue("include.test.bar.bridge");
+
+        //configure a virtual destination that forwards messages from topic testQueueName
+        //to queue "include.test.bar.bridge"
+        CompositeTopic compositeTopic = createCompositeTopic(testTopicName, activemq);
+        doSetUp(true, new VirtualDestination[] {compositeTopic}, true, true);
+        runtimeBroker.addNewDestination(activemq);
+
+        MessageConsumer advisoryConsumer1 = getVirtualDestinationAdvisoryConsumer(testTopicName);
+        //We should get 1 advisory replayed due to the virtual consumer demand for the composite queue
+        assertRemoteAdvisoryCount(advisoryConsumer1, 1);
+
+        MessageConsumer advisoryConsumer2 = getVirtualDestinationAdvisoryConsumer(testTopicName);
+        //We should get 1 advisory replayed on the new consumer but the existing consumer should not
+        //receive any messages as the advisory was already received before
+        assertRemoteAdvisoryCount(advisoryConsumer1, 0);
+        assertRemoteAdvisoryCount(advisoryConsumer2, 1);
+    }
+
     /**
      * Test that dynamic flow works for virtual destinations when a second composite
      * topic is included that forwards to the same queue, but is excluded from
@@ -1048,8 +1072,8 @@
 
         Thread.sleep(2000);
 
-        //there should be an extra advisory because of replay
-        assertRemoteAdvisoryCount(advisoryConsumer, 2);
+        //there should still only be 1 advisory
+        assertRemoteAdvisoryCount(advisoryConsumer, 1);
         assertAdvisoryBrokerCounts(1,1,1);
     }
 
@@ -1083,9 +1107,9 @@
         includedProducer.send(test);
         assertNotNull(bridgeConsumer.receive(5000));
 
-        //with isUseVirtualDestSubsOnCreation is true, there should be 4 advisories (2 replay)
-        //with !isUseVirtualDestSubsOnCreation, there should be 2 advisories (1 replay)
-        assertRemoteAdvisoryCount(advisoryConsumer, 4, 2);
+        //with isUseVirtualDestSubsOnCreation is true, there should be 2 advisories
+        //with !isUseVirtualDestSubsOnCreation, there should be 1 advisory
+        assertRemoteAdvisoryCount(advisoryConsumer, 2, 1);
         if (isUseVirtualDestSubsOnCreation) {
             assertAdvisoryBrokerCounts(1,2,1);
         } else {