Ensure that a new pull command is sent when a message exceeds the redelivery max and is poisoned. 
Fixes [AMQNET-507]. (See https://issues.apache.org/jira/browse/AMQNET-507)

diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index 41df167..48018f3 100755
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -1065,6 +1065,22 @@
                                        ConsumerId, dispatch);
                     PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery " +
                                         "policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+
+                    // Refresh the dispatch time
+                    dispatchTime = DateTime.Now;
+
+                    if(dispatchTime > deadline)
+                    {
+                        // Out of time.
+                        timeout = TimeSpan.Zero;
+                    }
+                    else
+                    {
+                        // Adjust the timeout to the remaining time.
+                        timeout = deadline - dispatchTime;
+                    }
+
+                    SendPullRequest((long) timeout.TotalMilliseconds);
                 }
                 else
                 {
diff --git a/src/test/csharp/ZeroPrefetchConsumerTest.cs b/src/test/csharp/ZeroPrefetchConsumerTest.cs
index b536d71..482557e 100644
--- a/src/test/csharp/ZeroPrefetchConsumerTest.cs
+++ b/src/test/csharp/ZeroPrefetchConsumerTest.cs
@@ -154,6 +154,54 @@
             Assert.IsNull(answer, "Should have not received a message!");
         }
 
+        [Test]
+        public void TestConsumerReceivePrefetchZeroRedeliveryZero()
+        {
+            const string QUEUE_NAME = "TEST.TestConsumerReceivePrefetchZeroRedeliveryZero";
+
+            using (Connection connection = CreateConnection() as Connection)
+            using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            using (IQueue queue = session.GetQueue(QUEUE_NAME))
+            {              
+                session.DeleteDestination(queue);
+
+                using (IMessageProducer producer = session.CreateProducer(queue))
+                {
+                    ITextMessage textMessage = session.CreateTextMessage("test Message");
+                    producer.Send(textMessage);
+                }
+            }
+
+            // consume and rollback - increase redelivery counter on message
+            using (Connection connection = CreateConnection() as Connection)
+            using (ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
+            using (IQueue queue = session.GetQueue(QUEUE_NAME))
+            using (IMessageConsumer consumer = session.CreateConsumer(queue))
+            {              
+                connection.Start();
+                IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.IsNotNull(message);
+                session.Rollback();
+            }
+
+            // try consume with timeout - expect it to timeout and return NULL message
+            using (Connection connection = CreateConnection() as Connection)
+            {
+                connection.PrefetchPolicy.All = 0;
+                connection.RedeliveryPolicy.MaximumRedeliveries = 0;
+                connection.Start();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue(QUEUE_NAME);
+
+                using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                {
+                    IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                    Assert.IsNull(message);
+                }
+            }
+        }
+
         [SetUp]
         public override void SetUp()
         {