AMQ-7464 - ensure message.copy before server session run dispatch
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 5910634..bef6f4e 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -73,6 +73,7 @@
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
@@ -882,7 +883,16 @@
         MessageDispatch messageDispatch;
         while ((messageDispatch = executor.dequeueNoWait()) != null) {
             final MessageDispatch md = messageDispatch;
-            final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
+
+            // subset of org.apache.activemq.ActiveMQMessageConsumer.createActiveMQMessage
+            final ActiveMQMessage message = (ActiveMQMessage)md.getMessage().copy();
+            if (message.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
+                ((ActiveMQBlobMessage)message).setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
+            }
+            if (message.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
+                ((ActiveMQObjectMessage)message).setTrustAllPackages(getConnection().isTrustAllPackages());
+                ((ActiveMQObjectMessage)message).setTrustedPackages(getConnection().getTrustedPackages());
+            }
 
             MessageAck earlyAck = null;
             if (message.isExpired()) {
@@ -951,7 +961,7 @@
                             @Override
                             public void afterRollback() throws Exception {
                                 if (LOG.isTraceEnabled()) {
-                                    LOG.trace("rollback {}", ack, new Throwable("here"));
+                                    LOG.trace("afterRollback {}", ack, new Throwable("here"));
                                 }
                                 // ensure we don't filter this as a duplicate
                                 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
@@ -979,6 +989,7 @@
                                     MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
                                     ack.setFirstMessageId(md.getMessage().getMessageId());
                                     ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
+                                    LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);
                                     asyncSendPacket(ack);
 
                                 } else {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index a0a1ca8..5f325a4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -149,7 +149,7 @@
     /**
      * @throws Exception
      */
-    public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
+    public void testNormalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
 
         // Receive a message with the JMS API
         RedeliveryPolicy policy = connection.getRedeliveryPolicy();
@@ -742,7 +742,7 @@
                 public void onMessage(Message message) {
                     try {
                         ActiveMQTextMessage m = (ActiveMQTextMessage) message;
-                        LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId());
+                        LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId() + ", redeliveryCount: "  + m.getRedeliveryCounter());
                         assertEquals("1st", m.getText());
                         assertEquals(receivedCount.get(), m.getRedeliveryCounter());
                         receivedCount.incrementAndGet();
@@ -802,6 +802,108 @@
 
     }
 
+
+    public void testRepeatedRedeliveryNoCommitForwardMessage() throws Exception {
+
+        connection.start();
+        Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = dlqSession.createProducer(destination);
+
+        // Send the messages
+        producer.send(dlqSession.createTextMessage("1st"));
+
+        dlqSession.commit();
+        MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+        final MessageProducer forwardingProducer = dlqSession.createProducer(new ActiveMQQueue("TEST_FORWARD"));
+
+        // Send the messages
+
+        final int maxRedeliveries = 2;
+        final AtomicInteger receivedCount = new AtomicInteger(0);
+
+        for (int i=0;i<=maxRedeliveries+1;i++) {
+            connection = (ActiveMQConnection)factory.createConnection(userName, password);
+            connections.add(connection);
+
+            RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+            policy.setInitialRedeliveryDelay(0);
+            policy.setUseExponentialBackOff(false);
+            policy.setMaximumRedeliveries(maxRedeliveries);
+
+            connection.start();
+            final CountDownLatch done = new CountDownLatch(1);
+
+            final ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED);
+            session.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        ActiveMQTextMessage m = (ActiveMQTextMessage) message;
+                        LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId() + " ,Redelivery: " + m.getRedeliveryCounter());
+                        assertEquals("1st", m.getText());
+                        assertEquals(receivedCount.get(), m.getRedeliveryCounter());
+                        receivedCount.incrementAndGet();
+
+                        // do a forward of the received message, will reset the messageID
+                        forwardingProducer.send(message);
+                        done.countDown();
+                    } catch (Exception ignored) {
+                        ignored.printStackTrace();
+                    }
+                }
+            });
+
+            connection.createConnectionConsumer(
+                    destination,
+                    null,
+                    new ServerSessionPool() {
+                        @Override
+                        public ServerSession getServerSession() throws JMSException {
+                            return new ServerSession() {
+                                @Override
+                                public Session getSession() throws JMSException {
+                                    return session;
+                                }
+
+                                @Override
+                                public void start() throws JMSException {
+                                }
+                            };
+                        }
+                    },
+                    100,
+                    false);
+
+            Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    session.run();
+                    return done.await(10, TimeUnit.MILLISECONDS);
+                }
+            }, 5000);
+
+            if (i<=maxRedeliveries) {
+                assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS));
+            } else {
+                // final redelivery gets poisoned before dispatch
+                assertFalse("listener not done @" + i, done.await(5, TimeUnit.SECONDS));
+            }
+            connection.close();
+            connections.remove(connection);
+        }
+
+        // We should be able to get the message off the DLQ now.
+        TextMessage m = (TextMessage)dlqConsumer.receive(1000);
+        assertNotNull("Got message from DLQ", m);
+        assertEquals("1st", m.getText());
+        String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+        assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
+        dlqSession.commit();
+
+    }
+
     public void testRedeliveryRollbackWithDelayBlocking() throws Exception
     {
         redeliveryRollbackWithDelay(true);