https://issues.apache.org/jira/browse/AMQNET-513
Preserve the rollback cause when poisoning a message.
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index 6f04946..74d07a1 100755
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -883,10 +883,10 @@
}
catch(Exception e)
{
+ dispatch.RollbackCause = e;
if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
{
// Schedule redelivery and possible dlq processing
- dispatch.RollbackCause = e;
Rollback();
}
else
@@ -1094,9 +1094,9 @@
}
}
- private bool ConsumeExpiredMessage(MessageDispatch dispatch)
+ private bool ConsumeExpiredMessage(MessageDispatch dispatch)
{
- if (dispatch.Message.IsExpired())
+ if (dispatch.Message.IsExpired())
{
return !info.Browser && !IgnoreExpiration;
}
@@ -1480,7 +1480,9 @@
cause.ExceptionClass = "javax.jms.JMSException";
cause.Message = lastMd.RollbackCause.Message;
poisonCause.Cause = cause;
+ poisonCause.Message = poisonCause.Message + " cause: " + lastMd.RollbackCause.Message;
}
+
ack.FirstMessageId = firstMsgId;
ack.PoisonCause = poisonCause;
diff --git a/src/test/csharp/MessageListenerRedeliveryTest.cs b/src/test/csharp/MessageListenerRedeliveryTest.cs
index 0b22b7a..17d4293 100644
--- a/src/test/csharp/MessageListenerRedeliveryTest.cs
+++ b/src/test/csharp/MessageListenerRedeliveryTest.cs
@@ -15,7 +15,11 @@
* limitations under the License.
*/
+using System;
using System.Threading;
+using System.Collections;
+using Apache.NMS;
+using Apache.NMS.Util;
using Apache.NMS.Test;
using Apache.NMS.Policies;
using NUnit.Framework;
@@ -26,15 +30,29 @@
public class MessageListenerRedeliveryTest : NMSTestSupport
{
private Connection connection;
- private int counter;
+ private volatile int counter;
private ISession session;
+ private ArrayList received;
+ private ArrayList dlqMessages;
+ private int maxDeliveries;
+
+ private CountDownLatch gotOneMessage;
+ private CountDownLatch gotTwoMessages;
+ private CountDownLatch gotOneDlqMessage;
+ private CountDownLatch gotMaxRedeliveries;
[SetUp]
public override void SetUp()
{
this.connection = (Connection) CreateConnection();
this.connection.RedeliveryPolicy = GetRedeliveryPolicy();
-
+ this.gotOneMessage = new CountDownLatch(1);
+ this.gotTwoMessages = new CountDownLatch(2);
+ this.gotOneDlqMessage = new CountDownLatch(1);
+ this.maxDeliveries = GetRedeliveryPolicy().MaximumRedeliveries;
+ this.gotMaxRedeliveries = new CountDownLatch(maxDeliveries);
+ this.received = new ArrayList();
+ this.dlqMessages = new ArrayList();
this.counter = 0;
}
@@ -76,6 +94,39 @@
}
}
+ private void OnTracedReceiveMessage(IMessage message)
+ {
+ try
+ {
+ received.Add(((ITextMessage) message).Text);
+ }
+ catch (Exception e)
+ {
+ Assert.Fail("Error: " + e.Message);
+ }
+
+ if (++counter < maxDeliveries)
+ {
+ throw new Exception("force a redelivery");
+ }
+
+ // new blood
+ counter = 0;
+ gotTwoMessages.countDown();
+ }
+
+ private void OnDlqMessage(IMessage message)
+ {
+ dlqMessages.Add(message);
+ gotOneDlqMessage.countDown();
+ }
+
+ private void OnRedeliveredMessage(IMessage message)
+ {
+ gotMaxRedeliveries.countDown();
+ throw new Exception("Test Forcing a Rollback");
+ }
+
[Test]
public void TestQueueRollbackConsumerListener()
{
@@ -91,7 +142,7 @@
IMessageConsumer consumer = session.CreateConsumer(queue);
consumer.Listener += new MessageListener(OnMessageListener);
-
+
Thread.Sleep(500);
// first try.. should get 2 since there is no delay on the
@@ -125,6 +176,191 @@
session.Close();
}
-
+
+ [Test]
+ public void TestQueueRollbackSessionListener()
+ {
+ connection.Start();
+
+ this.session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.CreateTemporaryQueue();
+ IMessageProducer producer = CreateProducer(session, queue);
+ IMessage message = CreateTextMessage(session);
+ producer.Send(message);
+ session.Commit();
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ consumer.Listener += new MessageListener(OnMessageListener);
+
+ Thread.Sleep(1000);
+
+ // first try
+ Assert.AreEqual(2, counter);
+
+ Thread.Sleep(1500);
+
+ // second try (redelivery after 1 sec)
+ Assert.AreEqual(3, counter);
+
+ Thread.Sleep(3000);
+
+ // third try (redelivery after 2 seconds) - it should give up after that
+ Assert.AreEqual(4, counter);
+
+ // create new message
+ producer.Send(CreateTextMessage(session));
+ session.Commit();
+
+ Thread.Sleep(1000);
+
+ // it should be committed, so no redelivery
+ Assert.AreEqual(5, counter);
+
+ Thread.Sleep(2000);
+
+ // no redelivery, counter should still be 4
+ Assert.AreEqual(5, counter);
+
+ session.Close();
+ }
+
+ [Test]
+ public void TestQueueSessionListenerExceptionRetry()
+ {
+ connection.Start();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.CreateTemporaryQueue();
+ IMessageProducer producer = CreateProducer(session, queue);
+ IMessage message = CreateTextMessage(session, "1");
+ producer.Send(message);
+ message = CreateTextMessage(session, "2");
+ producer.Send(message);
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ consumer.Listener += new MessageListener(OnTracedReceiveMessage);
+
+ Assert.IsTrue(gotTwoMessages.await(TimeSpan.FromSeconds(20)), "got message before retry expiry");
+
+ for (int i = 0; i < maxDeliveries; i++)
+ {
+ Assert.AreEqual("1", received[i], "got first redelivered: " + i);
+ }
+ for (int i = maxDeliveries; i < maxDeliveries * 2; i++)
+ {
+ Assert.AreEqual("2", received[i], "got first redelivered: " + i);
+ }
+
+ session.Close();
+ }
+
+ [Test]
+ public void TestQueueSessionListenerExceptionDlq()
+ {
+ connection.Start();
+
+ session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.CreateTemporaryQueue();
+ IMessageProducer producer = CreateProducer(session, queue);
+ IMessage message = CreateTextMessage(session);
+ producer.Send(message);
+
+ IDestination dlqDestination = session.GetQueue("ActiveMQ.DLQ");
+ connection.DeleteDestination(dlqDestination);
+ IMessageConsumer dlqConsumer = session.CreateConsumer(dlqDestination);
+ dlqConsumer.Listener += new MessageListener(OnDlqMessage);
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ consumer.Listener += new MessageListener(OnRedeliveredMessage);
+
+ Assert.IsTrue(gotMaxRedeliveries.await(TimeSpan.FromSeconds(20)), "got message before retry expiry");
+
+ // check DLQ
+ Assert.IsTrue(gotOneDlqMessage.await(TimeSpan.FromSeconds(20)), "got dlq message");
+
+ // check DLQ message cause is captured
+ message = dlqMessages[0] as IMessage;
+ Assert.IsNotNull(message, "dlq message captured");
+ String cause = message.Properties.GetString("dlqDeliveryFailureCause");
+
+ Assert.IsTrue(cause.Contains("JMSException"), "cause 'cause' exception is remembered");
+ Assert.IsTrue(cause.Contains("Test"), "is correct exception");
+ Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause policy is remembered");
+
+ session.Close();
+ }
+
+ private void OnMessageThenRollback(IMessage message)
+ {
+ gotOneMessage.countDown();
+ try
+ {
+ session.Rollback();
+ }
+ catch (Exception)
+ {
+ }
+
+ throw new Exception("Test force a redelivery");
+ }
+
+ [Test]
+ public void TestTransactedQueueSessionListenerExceptionDlq()
+ {
+ connection.Start();
+
+ session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.CreateTemporaryQueue();
+ IMessageProducer producer = CreateProducer(session, queue);
+ IMessage message = CreateTextMessage(session);
+ producer.Send(message);
+ session.Commit();
+
+ IDestination dlqDestination = session.GetQueue("ActiveMQ.DLQ");
+ connection.DeleteDestination(dlqDestination);
+ IMessageConsumer dlqConsumer = session.CreateConsumer(dlqDestination);
+ dlqConsumer.Listener += new MessageListener(OnDlqMessage);
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ consumer.Listener += new MessageListener(OnMessageThenRollback);
+
+ Assert.IsTrue(gotOneMessage.await(TimeSpan.FromSeconds(20)), "got message before retry expiry");
+
+ // check DLQ
+ Assert.IsTrue(gotOneDlqMessage.await(TimeSpan.FromSeconds(20)), "got dlq message");
+
+ // check DLQ message cause is captured
+ message = dlqMessages[0] as IMessage;
+ Assert.IsNotNull(message, "dlq message captured");
+ String cause = message.Properties.GetString("dlqDeliveryFailureCause");
+
+ Assert.IsTrue(cause.Contains("JMSException"), "cause 'cause' exception is remembered");
+ Assert.IsTrue(cause.Contains("Test force"), "is correct exception");
+ Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause policy is remembered");
+
+ session.Close();
+ }
+
+ private ITextMessage CreateTextMessage(ISession session, String text)
+ {
+ return session.CreateTextMessage(text);
+ }
+
+ private ITextMessage CreateTextMessage(ISession session)
+ {
+ return session.CreateTextMessage("Hello");
+ }
+
+ private IMessageProducer CreateProducer(ISession session, IDestination queue)
+ {
+ IMessageProducer producer = session.CreateProducer(queue);
+ producer.DeliveryMode = GetDeliveryMode();
+ return producer;
+ }
+
+ protected MsgDeliveryMode GetDeliveryMode()
+ {
+ return MsgDeliveryMode.Persistent;
+ }
}
}