AMQ-8349 - Ensure virtual destination consumer advisories are only
replayed to new advisory consumers and not existing
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index f2d41e9..5b722d8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -209,7 +209,7 @@
for (Iterator<ConsumerInfo> iter = virtualDestinationConsumers.keySet().iterator(); iter.hasNext(); ) {
ConsumerInfo key = iter.next();
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(key.getDestination());
- fireConsumerAdvisory(context, key.getDestination(), topic, key);
+ fireConsumerAdvisory(context, key.getDestination(), topic, key, info.getConsumerId());
}
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index f305b24..a0f4bdb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -298,6 +298,30 @@
assertAdvisoryBrokerCounts(1,1,1);
}
+ // AMQ-8349 -Verify that replayed advisory messages for virtual consumers are only sent to
+ // new advisory consumers and not to existing by mistake
+ @Test(timeout = 30 * 1000)
+ public void testAdvisoryReplayMultipleAdvisoryConsumers() throws Exception {
+ Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
+ ActiveMQQueue activemq = new ActiveMQQueue("include.test.bar.bridge");
+
+ //configure a virtual destination that forwards messages from topic testQueueName
+ //to queue "include.test.bar.bridge"
+ CompositeTopic compositeTopic = createCompositeTopic(testTopicName, activemq);
+ doSetUp(true, new VirtualDestination[] {compositeTopic}, true, true);
+ runtimeBroker.addNewDestination(activemq);
+
+ MessageConsumer advisoryConsumer1 = getVirtualDestinationAdvisoryConsumer(testTopicName);
+ //We should get 1 advisory replayed due to the virtual consumer demand for the composite queue
+ assertRemoteAdvisoryCount(advisoryConsumer1, 1);
+
+ MessageConsumer advisoryConsumer2 = getVirtualDestinationAdvisoryConsumer(testTopicName);
+ //We should get 1 advisory replayed on the new consumer but the existing consumer should not
+ //receive any messages as the advisory was already received before
+ assertRemoteAdvisoryCount(advisoryConsumer1, 0);
+ assertRemoteAdvisoryCount(advisoryConsumer2, 1);
+ }
+
/**
* Test that dynamic flow works for virtual destinations when a second composite
* topic is included that forwards to the same queue, but is excluded from
@@ -1048,8 +1072,8 @@
Thread.sleep(2000);
- //there should be an extra advisory because of replay
- assertRemoteAdvisoryCount(advisoryConsumer, 2);
+ //there should still only be 1 advisory
+ assertRemoteAdvisoryCount(advisoryConsumer, 1);
assertAdvisoryBrokerCounts(1,1,1);
}
@@ -1083,9 +1107,9 @@
includedProducer.send(test);
assertNotNull(bridgeConsumer.receive(5000));
- //with isUseVirtualDestSubsOnCreation is true, there should be 4 advisories (2 replay)
- //with !isUseVirtualDestSubsOnCreation, there should be 2 advisories (1 replay)
- assertRemoteAdvisoryCount(advisoryConsumer, 4, 2);
+ //with isUseVirtualDestSubsOnCreation is true, there should be 2 advisories
+ //with !isUseVirtualDestSubsOnCreation, there should be 1 advisory
+ assertRemoteAdvisoryCount(advisoryConsumer, 2, 1);
if (isUseVirtualDestSubsOnCreation) {
assertAdvisoryBrokerCounts(1,2,1);
} else {