AMQ-7464 - ensure message.copy before server session run dispatch
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 5910634..bef6f4e 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -73,6 +73,7 @@
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@@ -882,7 +883,16 @@
MessageDispatch messageDispatch;
while ((messageDispatch = executor.dequeueNoWait()) != null) {
final MessageDispatch md = messageDispatch;
- final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
+
+ // subset of org.apache.activemq.ActiveMQMessageConsumer.createActiveMQMessage
+ final ActiveMQMessage message = (ActiveMQMessage)md.getMessage().copy();
+ if (message.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
+ ((ActiveMQBlobMessage)message).setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
+ }
+ if (message.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
+ ((ActiveMQObjectMessage)message).setTrustAllPackages(getConnection().isTrustAllPackages());
+ ((ActiveMQObjectMessage)message).setTrustedPackages(getConnection().getTrustedPackages());
+ }
MessageAck earlyAck = null;
if (message.isExpired()) {
@@ -951,7 +961,7 @@
@Override
public void afterRollback() throws Exception {
if (LOG.isTraceEnabled()) {
- LOG.trace("rollback {}", ack, new Throwable("here"));
+ LOG.trace("afterRollback {}", ack, new Throwable("here"));
}
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
@@ -979,6 +989,7 @@
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
+ LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);
asyncSendPacket(ack);
} else {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index a0a1ca8..5f325a4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -149,7 +149,7 @@
/**
* @throws Exception
*/
- public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
+ public void testNormalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
// Receive a message with the JMS API
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
@@ -742,7 +742,7 @@
public void onMessage(Message message) {
try {
ActiveMQTextMessage m = (ActiveMQTextMessage) message;
- LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId());
+ LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId() + ", redeliveryCount: " + m.getRedeliveryCounter());
assertEquals("1st", m.getText());
assertEquals(receivedCount.get(), m.getRedeliveryCounter());
receivedCount.incrementAndGet();
@@ -802,6 +802,108 @@
}
+
+ public void testRepeatedRedeliveryNoCommitForwardMessage() throws Exception {
+
+ connection.start();
+ Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = dlqSession.createProducer(destination);
+
+ // Send the messages
+ producer.send(dlqSession.createTextMessage("1st"));
+
+ dlqSession.commit();
+ MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+ final MessageProducer forwardingProducer = dlqSession.createProducer(new ActiveMQQueue("TEST_FORWARD"));
+
+ // Send the messages
+
+ final int maxRedeliveries = 2;
+ final AtomicInteger receivedCount = new AtomicInteger(0);
+
+ for (int i=0;i<=maxRedeliveries+1;i++) {
+ connection = (ActiveMQConnection)factory.createConnection(userName, password);
+ connections.add(connection);
+
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(0);
+ policy.setUseExponentialBackOff(false);
+ policy.setMaximumRedeliveries(maxRedeliveries);
+
+ connection.start();
+ final CountDownLatch done = new CountDownLatch(1);
+
+ final ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED);
+ session.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ ActiveMQTextMessage m = (ActiveMQTextMessage) message;
+ LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId() + " ,Redelivery: " + m.getRedeliveryCounter());
+ assertEquals("1st", m.getText());
+ assertEquals(receivedCount.get(), m.getRedeliveryCounter());
+ receivedCount.incrementAndGet();
+
+ // do a forward of the received message, will reset the messageID
+ forwardingProducer.send(message);
+ done.countDown();
+ } catch (Exception ignored) {
+ ignored.printStackTrace();
+ }
+ }
+ });
+
+ connection.createConnectionConsumer(
+ destination,
+ null,
+ new ServerSessionPool() {
+ @Override
+ public ServerSession getServerSession() throws JMSException {
+ return new ServerSession() {
+ @Override
+ public Session getSession() throws JMSException {
+ return session;
+ }
+
+ @Override
+ public void start() throws JMSException {
+ }
+ };
+ }
+ },
+ 100,
+ false);
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ session.run();
+ return done.await(10, TimeUnit.MILLISECONDS);
+ }
+ }, 5000);
+
+ if (i<=maxRedeliveries) {
+ assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS));
+ } else {
+ // final redelivery gets poisoned before dispatch
+ assertFalse("listener not done @" + i, done.await(5, TimeUnit.SECONDS));
+ }
+ connection.close();
+ connections.remove(connection);
+ }
+
+ // We should be able to get the message off the DLQ now.
+ TextMessage m = (TextMessage)dlqConsumer.receive(1000);
+ assertNotNull("Got message from DLQ", m);
+ assertEquals("1st", m.getText());
+ String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+ assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+ dlqSession.commit();
+
+ }
+
public void testRedeliveryRollbackWithDelayBlocking() throws Exception
{
redeliveryRollbackWithDelay(true);