[AMQ-9217] Fix per-destination audits on IndividualDeadLetterStrategy
(cherry picked from commit 28f7eb7ee87c47e43cc3db11fcd550ef872327b3)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
index fa6532b..82e0756 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
@@ -31,13 +31,12 @@
private boolean processNonPersistent = false;
private boolean processExpired = true;
private boolean enableAudit = true;
- private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit();
private long expiration;
@Override
public void rollback(Message message) {
if (message != null && this.enableAudit) {
- messageAudit.rollback(message);
+ lookupActiveMQMessageAudit(message).rollback(message);
}
}
@@ -46,7 +45,7 @@
boolean result = false;
if (message != null) {
result = true;
- if (enableAudit && messageAudit.isDuplicate(message)) {
+ if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) {
result = false;
LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
}
@@ -108,20 +107,13 @@
this.expiration = expiration;
}
- public int getMaxProducersToAudit() {
- return messageAudit.getMaximumNumberOfProducersToTrack();
- }
+ public abstract int getMaxProducersToAudit();
- public void setMaxProducersToAudit(int maxProducersToAudit) {
- messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
- }
+ public abstract void setMaxProducersToAudit(int maxProducersToAudit);
- public void setMaxAuditDepth(int maxAuditDepth) {
- messageAudit.setAuditDepth(maxAuditDepth);
- }
+ public abstract void setMaxAuditDepth(int maxAuditDepth);
- public int getMaxAuditDepth() {
- return messageAudit.getAuditDepth();
- }
+ public abstract int getMaxAuditDepth();
+ protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message);
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
index 1dfaa15..3dd41ae 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
@@ -23,6 +24,7 @@
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
+import org.apache.activemq.util.LRUCache;
/**
* A {@link DeadLetterStrategy} where each destination has its own individual
@@ -40,6 +42,10 @@
private boolean useQueueForQueueMessages = true;
private boolean useQueueForTopicMessages = true;
private boolean destinationPerDurableSubscriber;
+ private int maxAuditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
+ private int maxProducersToAudit = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
+
+ private final LRUCache<String,ActiveMQMessageAudit> dedicatedMessageAudits = new LRUCache<>(10_000);
public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
if (message.getDestination().isQueue()) {
@@ -51,6 +57,13 @@
// Properties
// -------------------------------------------------------------------------
+ public int getMaxDestinationsToAudit() {
+ return dedicatedMessageAudits.getMaxCacheSize();
+ }
+
+ public void maxDestinationsToAudit(int maxDestinationsToAudit) {
+ this.dedicatedMessageAudits.setMaxCacheSize(maxDestinationsToAudit);
+ }
public String getQueuePrefix() {
return queuePrefix;
@@ -134,6 +147,26 @@
this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
}
+ @Override
+ public int getMaxProducersToAudit() {
+ return this.maxProducersToAudit;
+ }
+
+ @Override
+ public void setMaxProducersToAudit(int maxProducersToAudit) {
+ this.maxProducersToAudit = maxProducersToAudit;
+ }
+
+ @Override
+ public void setMaxAuditDepth(int maxAuditDepth) {
+ this.maxAuditDepth = maxAuditDepth;
+ }
+
+ @Override
+ public int getMaxAuditDepth() {
+ return this.maxAuditDepth;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
protected ActiveMQDestination createDestination(Message message,
@@ -168,4 +201,19 @@
}
}
+ @Override
+ protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) {
+ ActiveMQMessageAudit messageAudit;
+
+ synchronized(dedicatedMessageAudits) {
+ messageAudit = dedicatedMessageAudits.get(message.getDestination().getQualifiedName());
+
+ if(messageAudit == null) {
+ messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit());
+ dedicatedMessageAudits.put(message.getDestination().getQualifiedName(), messageAudit);
+ }
+
+ return messageAudit;
+ }
+ }
}
\ No newline at end of file
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
index 41f1f10..8a78e83 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -35,6 +36,7 @@
public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "ActiveMQ.DLQ";
private ActiveMQDestination deadLetterQueue = new ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME);
+ private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit();
public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
return deadLetterQueue;
@@ -48,4 +50,29 @@
this.deadLetterQueue = deadLetterQueue;
}
+ @Override
+ public int getMaxProducersToAudit() {
+ return messageAudit.getMaximumNumberOfProducersToTrack();
+ }
+
+ @Override
+ public void setMaxProducersToAudit(int maxProducersToAudit) {
+ messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
+ }
+
+ @Override
+ public void setMaxAuditDepth(int maxAuditDepth) {
+ messageAudit.setAuditDepth(maxAuditDepth);
+ }
+
+ @Override
+ public int getMaxAuditDepth() {
+ return messageAudit.getAuditDepth();
+ }
+
+ @Override
+ protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) {
+ return messageAudit;
+ }
+
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
index 5dc4ae7..2b51ffa 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
@@ -16,22 +16,31 @@
*/
package org.apache.activemq.broker.policy;
+import java.util.Arrays;
import java.util.Enumeration;
+import java.util.Set;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Queue;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.broker.region.virtual.CompositeQueue;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,12 +57,37 @@
strategy.setProcessNonPersistent(true);
policy.setDeadLetterStrategy(strategy);
+ PolicyEntry indvAuditPolicy = new PolicyEntry();
+ IndividualDeadLetterStrategy indvAuditDlqStrategy = new IndividualDeadLetterStrategy();
+ indvAuditDlqStrategy.setEnableAudit(true);
+ indvAuditPolicy.setDeadLetterStrategy(indvAuditDlqStrategy);
+
+ PolicyEntry shrAuditPolicy = new PolicyEntry();
+ SharedDeadLetterStrategy shrAuditDlqStrategy = new SharedDeadLetterStrategy();
+ shrAuditDlqStrategy.setEnableAudit(true);
+ shrAuditPolicy.setDeadLetterStrategy(shrAuditDlqStrategy);
+
PolicyMap pMap = new PolicyMap();
pMap.put(new ActiveMQQueue(getDestinationString()), policy);
pMap.put(new ActiveMQTopic(getDestinationString()), policy);
+ pMap.put(new ActiveMQQueue(getDestinationString() + ".INDV.>"), indvAuditPolicy);
+ pMap.put(new ActiveMQQueue(getDestinationString() + ".SHR.>"), shrAuditPolicy);
broker.setDestinationPolicy(pMap);
+ CompositeQueue indvAuditCompQueue = new CompositeQueue();
+ indvAuditCompQueue.setName(getDestinationString() + ".INDV.A");
+ indvAuditCompQueue.setForwardOnly(true);
+ indvAuditCompQueue.setForwardTo(Arrays.asList(new ActiveMQQueue(getDestinationString() + ".INDV.B"), new ActiveMQQueue(getDestinationString() + ".INDV.C")));
+
+ CompositeQueue sharedAuditCompQueue = new CompositeQueue();
+ sharedAuditCompQueue.setName(getDestinationString() + ".SHR.A");
+ sharedAuditCompQueue.setForwardOnly(true);
+ sharedAuditCompQueue.setForwardTo(Arrays.asList(new ActiveMQQueue(getDestinationString() + ".SHR.B"), new ActiveMQQueue(getDestinationString() + ".SHR.C")));
+
+ VirtualDestinationInterceptor vdi = new VirtualDestinationInterceptor();
+ vdi.setVirtualDestinations(new VirtualDestination[] { indvAuditCompQueue, sharedAuditCompQueue });
+ broker.setDestinationInterceptors(new VirtualDestinationInterceptor[] {vdi});
return broker;
}
@@ -99,6 +133,99 @@
assertNull("The message shouldn't be sent to another DLQ", testConsumer.receive(1000));
}
+ // AMQ-9217
+ public void testPerDestinationAuditDefault() throws Exception {
+ ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+ rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+
+ connection.start();
+ session = connection.createSession(transactedMode, acknowledgeMode);
+ MessageProducer messageProducerA = session.createProducer(session.createQueue(getDestinationString() + ".INDV.A"));
+ messageProducerA.send(session.createTextMessage("testPerDestinationAuditEnabled"));
+ session.commit();
+
+ for(String destName : Set.of(getDestinationString() + ".INDV.B", getDestinationString() + ".INDV.C")) {
+ for (int i = 0; i < rollbackCount; i++) {
+ MessageConsumer indvConsumer = session.createConsumer(session.createQueue(destName));
+ Message message = indvConsumer.receive(5000);
+ assertNotNull("No message received: ", message);
+
+ session.rollback();
+ LOG.info("Rolled back: " + rollbackCount + " times");
+ indvConsumer.close();
+ }
+ }
+
+ QueueViewMBean a = getProxyToQueue(getDestinationString() + ".INDV.A");
+ assertNotNull(a);
+ assertTrue(Wait.waitFor(() -> a.getEnqueueCount() == 0l, 3000, 250));
+ assertTrue(Wait.waitFor(() -> a.getQueueSize() == 0l, 3000, 250));
+
+ QueueViewMBean b = getProxyToQueue(getDestinationString() + ".INDV.B");
+ assertNotNull(b);
+ assertTrue(Wait.waitFor(() -> b.getEnqueueCount() == 1l, 3000, 250));
+ assertTrue(Wait.waitFor(() -> b.getQueueSize() == 0l, 3000, 250));
+
+ QueueViewMBean c = getProxyToQueue(getDestinationString() + ".INDV.C");
+ assertNotNull(c);
+ assertTrue(Wait.waitFor(() -> c.getEnqueueCount() == 1l, 3000, 250));
+ assertTrue(Wait.waitFor(() -> c.getQueueSize() == 0l, 3000, 250));
+
+ QueueViewMBean bDlq = getProxyToQueue("ActiveMQ.DLQ.Queue." + getDestinationString() + ".INDV.B");
+ assertNotNull(bDlq);
+ assertTrue(Wait.waitFor(() -> bDlq.getEnqueueCount() == 1l, 3000, 250));
+ assertTrue(Wait.waitFor(() -> bDlq.getQueueSize() == 1l, 3000, 250));
+
+ QueueViewMBean cDlq = getProxyToQueue("ActiveMQ.DLQ.Queue." + getDestinationString() + ".INDV.C");
+ assertNotNull(cDlq);
+ assertTrue(Wait.waitFor(() -> cDlq.getEnqueueCount() == 1, 3000, 250));
+ assertTrue(Wait.waitFor(() -> cDlq.getQueueSize() == 1, 3000, 250));
+ }
+
+ public void testSharedDestinationAuditDropsMessages() throws Exception {
+ ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+ rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+
+ connection.start();
+ session = connection.createSession(transactedMode, acknowledgeMode);
+ MessageProducer messageProducerA = session.createProducer(session.createQueue(getDestinationString() + ".SHR.A"));
+ messageProducerA.send(session.createTextMessage("testSharedDestinationAuditDropsMessages"));
+ session.commit();
+
+ for(String destName : Set.of(getDestinationString() + ".SHR.B", getDestinationString() + ".SHR.C")) {
+ for (int i = 0; i < rollbackCount; i++) {
+ MessageConsumer shrConsumer = session.createConsumer(session.createQueue(destName));
+ Message message = shrConsumer.receive(5000);
+ assertNotNull("No message received: ", message);
+
+ session.rollback();
+ LOG.info("Rolled back: " + rollbackCount + " times");
+ shrConsumer.close();
+ }
+ }
+
+ QueueViewMBean a = getProxyToQueue(getDestinationString() + ".SHR.A");
+ assertNotNull(a);
+ assertTrue(Wait.waitFor(() -> a.getEnqueueCount() == 0l, 3000, 250));
+ assertTrue(Wait.waitFor(() -> a.getQueueSize() == 0l, 3000, 250));
+
+ QueueViewMBean b = getProxyToQueue(getDestinationString() + ".SHR.B");
+ assertNotNull(b);
+ assertTrue(Wait.waitFor(() -> b.getEnqueueCount() == 1l, 3000, 250));
+ assertTrue(Wait.waitFor(() -> b.getQueueSize() == 0l, 3000, 250));
+
+ QueueViewMBean c = getProxyToQueue(getDestinationString() + ".SHR.C");
+ assertNotNull(c);
+ assertTrue(Wait.waitFor(() -> c.getEnqueueCount() == 1l, 3000, 250));
+ assertTrue(Wait.waitFor(() -> c.getQueueSize() == 0l, 3000, 250));
+
+ // Only 1 message in 1 DLQ means the a message was dropped due to shared message audit
+ QueueViewMBean sharedDlq = getProxyToQueue("ActiveMQ.DLQ");
+ assertNotNull(sharedDlq);
+ assertTrue(Wait.waitFor(() -> sharedDlq.getEnqueueCount() == 1, 3000, 250));
+ assertTrue(Wait.waitFor(() -> sharedDlq.getQueueSize() == 1, 3000, 250));
+ }
+
protected void browseDlq() throws Exception {
Enumeration<?> messages = dlqBrowser.getEnumeration();
while (messages.hasMoreElements()) {