Copy the ignoreExpiration implementation from the OpenWire provider.
Fixes [AMQNET-478]. (See https://issues.apache.org/jira/browse/AMQNET-478)

diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index 75c912d..1f79a7a 100755
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 using System;
+using System.Collections.Specialized;
 using System.Threading;
 using System.Collections.Generic;
 using Apache.NMS.Stomp.Commands;
@@ -58,14 +59,45 @@
         private IRedeliveryPolicy redeliveryPolicy;
         private Exception failureError;
 
-        // Constructor internal to prevent clients from creating an instance.
-        internal MessageConsumer(Session session, ConsumerInfo info)
+		// Constructor internal to prevent clients from creating an instance.
+        internal MessageConsumer(Session session, ConsumerId id, Destination destination, string name, string selector, int prefetch, bool noLocal)
         {
-            this.session = session;
-            this.info = info;
+			if(destination == null)
+			{
+				throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+			}
+			
+			this.session = session;
             this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
             this.messageTransformation = this.session.Connection.MessageTransformation;
-        }
+
+			this.info = new ConsumerInfo();
+			this.info.ConsumerId = id;
+			this.info.Destination = Destination.Transform(destination);
+			this.info.SubscriptionName = name;
+			this.info.Selector = selector;
+			this.info.PrefetchSize = prefetch;
+			this.info.MaximumPendingMessageLimit = session.Connection.PrefetchPolicy.MaximumPendingMessageLimit;
+			this.info.NoLocal = noLocal;
+			this.info.DispatchAsync = session.DispatchAsync;
+			this.info.Retroactive = session.Retroactive;
+			this.info.Exclusive = session.Exclusive;
+			this.info.Priority = session.Priority;
+			this.info.AckMode = session.AcknowledgementMode;
+
+			// If the destination contained a URI query, then use it to set public properties
+			// on the ConsumerInfo
+			if(destination.Options != null)
+			{
+				// Get options prefixed with "consumer.*"
+				StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
+				// Extract out custom extension options "consumer.nms.*"
+				StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");
+
+				URISupport.SetProperties(this.info, options);
+				URISupport.SetProperties(this, customConsumerOptions, "nms.");
+			}
+		}
 
         ~MessageConsumer()
         {
@@ -79,7 +111,12 @@
             get { return info.ConsumerId; }
         }
 
-        public int PrefetchSize
+		public ConsumerInfo ConsumerInfo
+		{
+			get { return this.info; }
+		}
+		
+		public int PrefetchSize
         {
             get { return this.info.PrefetchSize; }
         }
@@ -90,18 +127,26 @@
             set { this.redeliveryPolicy = value; }
         }
 
-        private ConsumerTransformerDelegate consumerTransformer;
-        public ConsumerTransformerDelegate ConsumerTransformer
-        {
-            get { return this.consumerTransformer; }
-            set { this.consumerTransformer = value; }
-        }
+		// Custom Options
+		private bool ignoreExpiration = false;
+		public bool IgnoreExpiration
+		{
+			get { return ignoreExpiration; }
+			set { ignoreExpiration = value; }
+		}
 
-        #endregion
+		#endregion
 
         #region IMessageConsumer Members
 
-        public event MessageListener Listener
+		private ConsumerTransformerDelegate consumerTransformer;
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
+
+		public event MessageListener Listener
         {
             add
             {
@@ -424,7 +469,7 @@
 
                             try
                             {
-                                bool expired = message.IsExpired();
+								bool expired = (!IgnoreExpiration && message.IsExpired());
 
                                 if(!expired)
                                 {
@@ -548,7 +593,7 @@
                 {
                     return null;
                 }
-                else if(dispatch.Message.IsExpired())
+				else if(!IgnoreExpiration && dispatch.Message.IsExpired())
                 {
                     Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
 
diff --git a/src/main/csharp/Session.cs b/src/main/csharp/Session.cs
index c3efd85..e2056c6 100755
--- a/src/main/csharp/Session.cs
+++ b/src/main/csharp/Session.cs
@@ -416,40 +416,47 @@
                 throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
             }
 
-            ConsumerInfo command = CreateConsumerInfo(destination, selector);
-            command.NoLocal = noLocal;
-            ConsumerId consumerId = command.ConsumerId;
-            MessageConsumer consumer = null;
+			int prefetchSize = this.Connection.PrefetchPolicy.DurableTopicPrefetch;
 
-            // Registered with Connection before we register at the broker.
-            connection.addDispatcher(consumerId, this);
+			if(destination.IsTopic)
+			{
+				prefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
+			}
+			else if(destination.IsQueue)
+			{
+				prefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
+			}
+			
+            MessageConsumer consumer = null;
 
             try
             {
-                consumer = new MessageConsumer(this, command);
+	            Destination dest = destination as Destination;
+				consumer = new MessageConsumer(this, GetNextConsumerId(), dest, null, selector, prefetchSize, noLocal);
                 consumer.ConsumerTransformer = this.ConsumerTransformer;
-                consumers[consumerId] = consumer;
+				this.AddConsumer(consumer);
 
-                if(this.Started)
+				// lets register the consumer first in case we start dispatching messages immediately
+				this.Connection.SyncRequest(consumer.ConsumerInfo);
+
+				if(this.Started)
                 {
                     consumer.Start();
                 }
-
-                // lets register the consumer first in case we start dispatching messages immediately
-                this.Connection.SyncRequest(command);
-
-                return consumer;
             }
             catch(Exception)
             {
                 if(consumer != null)
                 {
+					this.RemoveConsumer(consumer);
                     consumer.Close();
                 }
 
                 throw;
             }
-        }
+
+			return consumer;
+		}
 
         public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
         {
@@ -458,33 +465,26 @@
                 throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
             }
 
-            ConsumerInfo command = CreateConsumerInfo(destination, selector);
-            ConsumerId consumerId = command.ConsumerId;
-            command.SubscriptionName = name;
-            command.NoLocal = noLocal;
-            command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch;
             MessageConsumer consumer = null;
 
-            // Registered with Connection before we register at the broker.
-            connection.addDispatcher(consumerId, this);
-
             try
             {
-                consumer = new MessageConsumer(this, command);
-                consumer.ConsumerTransformer = this.ConsumerTransformer;
-                consumers[consumerId] = consumer;
+				Destination dest = destination as Destination;
+				consumer = new MessageConsumer(this, GetNextConsumerId(), dest, name, selector, this.connection.PrefetchPolicy.DurableTopicPrefetch, noLocal);
+				consumer.ConsumerTransformer = this.ConsumerTransformer;
+				this.AddConsumer(consumer);
+				this.connection.SyncRequest(consumer.ConsumerInfo);
 
-                if(this.Started)
+				if(this.Started)
                 {
                     consumer.Start();
                 }
-
-                this.connection.SyncRequest(command);
             }
             catch(Exception)
             {
                 if(consumer != null)
                 {
+					this.RemoveConsumer(consumer);
                     consumer.Close();
                 }
 
@@ -633,7 +633,26 @@
 
         #endregion
 
-        public void DoSend( Message message, MessageProducer producer, TimeSpan sendTimeout )
+		public void AddConsumer(MessageConsumer consumer)
+		{
+			if(!this.closing)
+			{
+				// Registered with Connection before we register at the broker.
+				consumers[consumer.ConsumerId] = consumer;
+				connection.addDispatcher(consumer.ConsumerId, this);
+			}
+		}
+
+		public void RemoveConsumer(MessageConsumer consumer)
+		{
+			connection.removeDispatcher(consumer.ConsumerId);
+			if(!this.closing)
+			{
+				consumers.Remove(consumer.ConsumerId);
+			}
+		}
+
+		public void DoSend(Message message, MessageProducer producer, TimeSpan sendTimeout)
         {
             Message msg = message;
 
@@ -699,52 +718,10 @@
             }
         }
 
-        protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
-        {
-            ConsumerInfo answer = new ConsumerInfo();
-            ConsumerId id = new ConsumerId();
-            id.ConnectionId = info.SessionId.ConnectionId;
-            id.SessionId = info.SessionId.Value;
-            id.Value = Interlocked.Increment(ref consumerCounter);
-            answer.ConsumerId = id;
-            answer.Destination = Destination.Transform(destination);
-            answer.Selector = selector;
-            answer.Priority = this.Priority;
-            answer.Exclusive = this.Exclusive;
-            answer.DispatchAsync = this.DispatchAsync;
-            answer.Retroactive = this.Retroactive;
-            answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit;
-            answer.AckMode = this.AcknowledgementMode;
-
-            if(destination is ITopic || destination is ITemporaryTopic)
-            {
-                answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
-            }
-            else if(destination is IQueue || destination is ITemporaryQueue)
-            {
-                answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
-            }
-
-            // If the destination contained a URI query, then use it to set public properties
-            // on the ConsumerInfo
-            Destination amqDestination = destination as Destination;
-            if(amqDestination != null && amqDestination.Options != null)
-            {
-                StringDictionary options = URISupport.GetProperties(amqDestination.Options, "consumer.");
-                URISupport.SetProperties(answer, options);
-            }
-
-            return answer;
-        }
-
         protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
         {
             ProducerInfo answer = new ProducerInfo();
-            ProducerId id = new ProducerId();
-            id.ConnectionId = info.SessionId.ConnectionId;
-            id.SessionId = info.SessionId.Value;
-            id.Value = Interlocked.Increment(ref producerCounter);
-            answer.ProducerId = id;
+            answer.ProducerId = GetNextProducerId();
             answer.Destination = Destination.Transform(destination);
 
             // If the destination contained a URI query, then use it to set public
@@ -759,7 +736,27 @@
             return answer;
         }
 
-        public void Stop()
+		public ConsumerId GetNextConsumerId()
+		{
+			ConsumerId id = new ConsumerId();
+			id.ConnectionId = info.SessionId.ConnectionId;
+			id.SessionId = info.SessionId.Value;
+			id.Value = Interlocked.Increment(ref consumerCounter);
+
+			return id;
+		}
+
+		public ProducerId GetNextProducerId()
+		{
+			ProducerId id = new ProducerId();
+			id.ConnectionId = info.SessionId.ConnectionId;
+			id.SessionId = info.SessionId.Value;
+			id.Value = Interlocked.Increment(ref producerCounter);
+
+			return id;
+		}
+
+		public void Stop()
         {
             if(this.executor != null)
             {
diff --git a/src/test/csharp/MessageConsumerTest.cs b/src/test/csharp/MessageConsumerTest.cs
new file mode 100644
index 0000000..1f62da5
--- /dev/null
+++ b/src/test/csharp/MessageConsumerTest.cs
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

+

+using System.Threading;

+using Apache.NMS.Test;

+using NUnit.Framework;

+using Apache.NMS.Stomp.Commands;

+using System;

+using Apache.NMS.Util;

+

+namespace Apache.NMS.Stomp.Test

+{

+    public enum ExpirationOptions

+    {

+        DEFAULT,

+        IGNORE,

+        DO_NOT_IGNORE

+    }

+

+    [TestFixture]

+    public class MessageConsumerTest : NMSTestSupport

+    {

+        protected static string DESTINATION_NAME = "queue://TEST.MessageConsumerTestDestination";

+        protected static string TEST_CLIENT_ID = "MessageConsumerTestClientId";

+

+        private CountDownLatch doneLatch;

+        private int counter;

+        private String errorMessage;

+

+        [SetUp]

+        public override void SetUp()

+        {

+            base.SetUp();

+

+            this.doneLatch = new CountDownLatch(1);

+            this.counter = 0;

+            this.errorMessage = null;

+        }

+

+        [Test]

+        public void TestBadSelectorDoesNotCloseConnection()

+        {

+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))

+            {

+                using(ISession sender = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))

+                using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))

+                {

+                    IDestination destination = sender.CreateTemporaryQueue();

+

+                    IMessageProducer producer = sender.CreateProducer(destination);

+                    ITextMessage goodMsg = sender.CreateTextMessage("testGood");

+                    producer.Send(goodMsg);

+

+                    IMessageConsumer consumer = session.CreateConsumer(destination);

+                    connection.Start();

+                    Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));

+

+                    try

+                    {

+                        ISession badListenerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);

+                        badListenerSession.CreateConsumer(destination, "badSelector;too");

+                        Assert.Fail("Exception expected.");

+                    }

+                    catch(Exception e)

+                    {

+                        Tracer.DebugFormat("Caught Ex: {0}", e);

+                    }

+

+                    ITextMessage failMsg = sender.CreateTextMessage("testFail");

+                    producer.Send(failMsg);

+                    Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));

+                }

+            }

+        }

+

+        [Test]

+        public void TestAsyncDispatchExceptionRedelivers()

+        {

+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))

+            {

+                using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))

+                {

+                    IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME) as IQueue;

+

+                    using(IMessageProducer producer = session.CreateProducer(queue))

+                    {

+                        producer.DeliveryMode = MsgDeliveryMode.NonPersistent;

+                        producer.Send(producer.CreateTextMessage("First"));

+                        producer.Send(producer.CreateTextMessage("Second"));

+                    }

+

+                    using(IMessageConsumer consumer = session.CreateConsumer(queue))

+                    {

+                        consumer.Listener += OnTestAsynchRedliversMessage;

+

+                        connection.Start();

+

+                        if(doneLatch.await(TimeSpan.FromSeconds(10)))

+                        {

+                            if(!String.IsNullOrEmpty(errorMessage))

+                            {

+                                Assert.Fail(errorMessage);

+                            }

+                        }

+                        else

+                        {

+                            Assert.Fail("Timeout waiting for async message delivery to complete.");

+                        }

+                    }

+                }

+            }

+        }

+

+        private void OnTestAsynchRedliversMessage(IMessage msg)

+        {

+            counter++;

+            try

+            {

+                ITextMessage message = msg as ITextMessage;

+                switch(counter)

+                {

+                    case 1:

+                        Tracer.Debug("Got first Message: " + message.Text);

+                        Assert.AreEqual("First", message.Text);

+                        Assert.IsFalse(message.NMSRedelivered);

+                        break;

+                    case 2:

+                        Tracer.Debug("Got Second Message: " + message.Text);

+                        Assert.AreEqual("Second", message.Text);

+                        Assert.IsFalse(message.NMSRedelivered);

+                        throw new Exception("Ignore Me");

+                    case 3:

+                        Tracer.Debug("Got Third Message: " + message.Text);

+                        Assert.AreEqual("Second", message.Text);

+                        Assert.IsTrue(message.NMSRedelivered);

+                        doneLatch.countDown();

+                        break;

+                    default:

+                        errorMessage = "Got too many messages: " + counter;

+                        Tracer.Debug(errorMessage);

+                        doneLatch.countDown();

+                        break;

+                }

+            }

+            catch(Exception e)

+            {

+                if(e.Message.Equals("Ignore Me"))

+                {

+                    throw;

+                }

+                errorMessage = "Got exception: " + e.Message;

+                Tracer.Warn("Exception on Message Receive: " + e.Message);

+                doneLatch.countDown();

+            }

+        }

+

+        [Test]

+        public void ConsumeInTwoThreads()

+        {

+            ParameterizedThreadStart threadStart =
+                delegate(object o)

+                {

+                    IMessageConsumer consumer = (IMessageConsumer) o;

+                    IMessage message = consumer.Receive(TimeSpan.FromSeconds(2));

+                    Assert.IsNotNull(message);

+                };

+

+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))

+            {

+                connection.Start();

+                using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))

+                {

+                    IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME) as IQueue;

+

+                    // enqueue 2 messages

+                    using(IMessageConsumer consumer = session.CreateConsumer(queue))

+                    using(IMessageProducer producer = session.CreateProducer(queue))

+                    {

+                        producer.DeliveryMode = MsgDeliveryMode.Persistent;

+                        producer.Send(producer.CreateMessage());

+                        producer.Send(producer.CreateMessage());

+                        session.Commit();

+

+                        // receive first using a dedicated thread. This works

+                        Thread thread = new Thread(threadStart);

+                        thread.Start(consumer);

+                        thread.Join();

+                        session.Commit();

+

+                        // receive second using main thread. This FAILS

+                        IMessage message = consumer.Receive(TimeSpan.FromSeconds(2)); // throws System.Threading.AbandonedMutexException

+                        Assert.IsNotNull(message);

+                        session.Commit();

+                    }

+                }

+            }

+        }

+

+        [Test]

+        public void TestReceiveIgnoreExpirationMessage(

+            [Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,

+                AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+            AcknowledgementMode ackMode,

+            [Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
+            MsgDeliveryMode deliveryMode,

+            [Values(ExpirationOptions.DEFAULT, ExpirationOptions.IGNORE, ExpirationOptions.DO_NOT_IGNORE)]
+            ExpirationOptions expirationOption)

+        {

+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))

+            {

+                connection.Start();

+                using(Session session = connection.CreateSession(ackMode) as Session)

+                {

+                    string destinationName = DESTINATION_NAME;

+

+                    if(ExpirationOptions.IGNORE == expirationOption)

+                    {

+                        destinationName += "?consumer.nms.ignoreExpiration=true";

+                    }

+                    else if(ExpirationOptions.DO_NOT_IGNORE == expirationOption)

+                    {

+                        destinationName += "?consumer.nms.ignoreExpiration=false";

+                    }

+

+                    try

+                    {

+                        IDestination destination = SessionUtil.GetDestination(session, destinationName);

+

+                        using(IMessageConsumer consumer = session.CreateConsumer(destination))

+                        using(IMessageProducer producer = session.CreateProducer(destination))

+                        {

+                            producer.DeliveryMode = deliveryMode;

+

+                            string msgText = string.Format("ExpiredMessage: {0}", Guid.NewGuid().ToString());

+

+                            TextMessage msg = session.CreateTextMessage(msgText) as TextMessage;

+

+                            // Give it two seconds to live.

+                            msg.NMSTimeToLive = TimeSpan.FromMilliseconds(2000);

+

+                            producer.Send(msg);

+

+                            if(AcknowledgementMode.Transactional == ackMode)

+                            {

+                                session.Commit();

+                            }

+

+                            // Wait for four seconds before processing it.  The broker will have sent it to our local

+                            // client dispatch queue, but we won't attempt to process the message until it has had

+                            // a chance to expire within our internal queue system.

+                            Thread.Sleep(4000);

+

+                            TextMessage rcvMsg = consumer.ReceiveNoWait() as TextMessage;

+

+                            if(ExpirationOptions.IGNORE == expirationOption)

+                            {

+                                Assert.IsNotNull(rcvMsg, "Did not receive expired message.");

+                                rcvMsg.Acknowledge();

+

+                                Assert.AreEqual(msgText, rcvMsg.Text, "Message text does not match.");

+                                Assert.IsTrue(rcvMsg.IsExpired());

+

+                                if(AcknowledgementMode.Transactional == ackMode)

+                                {

+                                    session.Commit();

+                                }

+                            }

+                            else

+                            {

+                                // Should not receive a message.

+                                Assert.IsNull(rcvMsg, "Received an expired message!");

+                            }

+

+                            consumer.Close();

+                            producer.Close();

+                        }

+                    }

+                    finally

+                    {

+                        try

+                        {

+                            // Ensure that Session resources on the Broker release transacted Consumers.

+                            session.Close();

+                            // Give the Broker some time to remove the subscriptions.

+                            Thread.Sleep(2000);

+                            SessionUtil.DeleteDestination(session, destinationName);

+                        }

+                        catch

+                        {

+                        }

+                    }

+                }

+            }

+        }

+    }

+}

diff --git a/vs2008-stomp-test.csproj b/vs2008-stomp-test.csproj
index d93579e..b3a6a96 100644
--- a/vs2008-stomp-test.csproj
+++ b/vs2008-stomp-test.csproj
@@ -68,6 +68,7 @@
       <HintPath>lib\NUnit\net-2.0\nunit.framework.dll</HintPath>

     </Reference>

     <Reference Include="System" />

+    <Reference Include="System.Data" />

     <Reference Include="System.Xml" />

   </ItemGroup>

   <ItemGroup>

@@ -104,6 +105,7 @@
   <ItemGroup>

     <Compile Include="src\test\csharp\AMQNET383Test.cs" />

     <Compile Include="src\test\csharp\InvalidCredentialsTest.cs" />

+    <Compile Include="src\test\csharp\MessageConsumerTest.cs" />

     <Compile Include="src\test\csharp\StompHelperTest.cs" />

     <Compile Include="src\test\csharp\StompRedeliveryPolicyTest.cs" />

     <Compile Include="src\test\csharp\Threads\CompositeTaskRunnerTest.cs" />