Copy the ignoreExpiration implementation from the OpenWire provider.
Fixes [AMQNET-478]. (See https://issues.apache.org/jira/browse/AMQNET-478)
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index 75c912d..1f79a7a 100755
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
using System;
+using System.Collections.Specialized;
using System.Threading;
using System.Collections.Generic;
using Apache.NMS.Stomp.Commands;
@@ -58,14 +59,45 @@
private IRedeliveryPolicy redeliveryPolicy;
private Exception failureError;
- // Constructor internal to prevent clients from creating an instance.
- internal MessageConsumer(Session session, ConsumerInfo info)
+ // Constructor internal to prevent clients from creating an instance.
+ internal MessageConsumer(Session session, ConsumerId id, Destination destination, string name, string selector, int prefetch, bool noLocal)
{
- this.session = session;
- this.info = info;
+ if(destination == null)
+ {
+ throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+ }
+
+ this.session = session;
this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
this.messageTransformation = this.session.Connection.MessageTransformation;
- }
+
+ this.info = new ConsumerInfo();
+ this.info.ConsumerId = id;
+ this.info.Destination = Destination.Transform(destination);
+ this.info.SubscriptionName = name;
+ this.info.Selector = selector;
+ this.info.PrefetchSize = prefetch;
+ this.info.MaximumPendingMessageLimit = session.Connection.PrefetchPolicy.MaximumPendingMessageLimit;
+ this.info.NoLocal = noLocal;
+ this.info.DispatchAsync = session.DispatchAsync;
+ this.info.Retroactive = session.Retroactive;
+ this.info.Exclusive = session.Exclusive;
+ this.info.Priority = session.Priority;
+ this.info.AckMode = session.AcknowledgementMode;
+
+ // If the destination contained a URI query, then use it to set public properties
+ // on the ConsumerInfo
+ if(destination.Options != null)
+ {
+ // Get options prefixed with "consumer.*"
+ StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
+ // Extract out custom extension options "consumer.nms.*"
+ StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");
+
+ URISupport.SetProperties(this.info, options);
+ URISupport.SetProperties(this, customConsumerOptions, "nms.");
+ }
+ }
~MessageConsumer()
{
@@ -79,7 +111,12 @@
get { return info.ConsumerId; }
}
- public int PrefetchSize
+ public ConsumerInfo ConsumerInfo
+ {
+ get { return this.info; }
+ }
+
+ public int PrefetchSize
{
get { return this.info.PrefetchSize; }
}
@@ -90,18 +127,26 @@
set { this.redeliveryPolicy = value; }
}
- private ConsumerTransformerDelegate consumerTransformer;
- public ConsumerTransformerDelegate ConsumerTransformer
- {
- get { return this.consumerTransformer; }
- set { this.consumerTransformer = value; }
- }
+ // Custom Options
+ private bool ignoreExpiration = false;
+ public bool IgnoreExpiration
+ {
+ get { return ignoreExpiration; }
+ set { ignoreExpiration = value; }
+ }
- #endregion
+ #endregion
#region IMessageConsumer Members
- public event MessageListener Listener
+ private ConsumerTransformerDelegate consumerTransformer;
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get { return this.consumerTransformer; }
+ set { this.consumerTransformer = value; }
+ }
+
+ public event MessageListener Listener
{
add
{
@@ -424,7 +469,7 @@
try
{
- bool expired = message.IsExpired();
+ bool expired = (!IgnoreExpiration && message.IsExpired());
if(!expired)
{
@@ -548,7 +593,7 @@
{
return null;
}
- else if(dispatch.Message.IsExpired())
+ else if(!IgnoreExpiration && dispatch.Message.IsExpired())
{
Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
diff --git a/src/main/csharp/Session.cs b/src/main/csharp/Session.cs
index c3efd85..e2056c6 100755
--- a/src/main/csharp/Session.cs
+++ b/src/main/csharp/Session.cs
@@ -416,40 +416,47 @@
throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
}
- ConsumerInfo command = CreateConsumerInfo(destination, selector);
- command.NoLocal = noLocal;
- ConsumerId consumerId = command.ConsumerId;
- MessageConsumer consumer = null;
+ int prefetchSize = this.Connection.PrefetchPolicy.DurableTopicPrefetch;
- // Registered with Connection before we register at the broker.
- connection.addDispatcher(consumerId, this);
+ if(destination.IsTopic)
+ {
+ prefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
+ }
+ else if(destination.IsQueue)
+ {
+ prefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
+ }
+
+ MessageConsumer consumer = null;
try
{
- consumer = new MessageConsumer(this, command);
+ Destination dest = destination as Destination;
+ consumer = new MessageConsumer(this, GetNextConsumerId(), dest, null, selector, prefetchSize, noLocal);
consumer.ConsumerTransformer = this.ConsumerTransformer;
- consumers[consumerId] = consumer;
+ this.AddConsumer(consumer);
- if(this.Started)
+ // lets register the consumer first in case we start dispatching messages immediately
+ this.Connection.SyncRequest(consumer.ConsumerInfo);
+
+ if(this.Started)
{
consumer.Start();
}
-
- // lets register the consumer first in case we start dispatching messages immediately
- this.Connection.SyncRequest(command);
-
- return consumer;
}
catch(Exception)
{
if(consumer != null)
{
+ this.RemoveConsumer(consumer);
consumer.Close();
}
throw;
}
- }
+
+ return consumer;
+ }
public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
{
@@ -458,33 +465,26 @@
throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
}
- ConsumerInfo command = CreateConsumerInfo(destination, selector);
- ConsumerId consumerId = command.ConsumerId;
- command.SubscriptionName = name;
- command.NoLocal = noLocal;
- command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch;
MessageConsumer consumer = null;
- // Registered with Connection before we register at the broker.
- connection.addDispatcher(consumerId, this);
-
try
{
- consumer = new MessageConsumer(this, command);
- consumer.ConsumerTransformer = this.ConsumerTransformer;
- consumers[consumerId] = consumer;
+ Destination dest = destination as Destination;
+ consumer = new MessageConsumer(this, GetNextConsumerId(), dest, name, selector, this.connection.PrefetchPolicy.DurableTopicPrefetch, noLocal);
+ consumer.ConsumerTransformer = this.ConsumerTransformer;
+ this.AddConsumer(consumer);
+ this.connection.SyncRequest(consumer.ConsumerInfo);
- if(this.Started)
+ if(this.Started)
{
consumer.Start();
}
-
- this.connection.SyncRequest(command);
}
catch(Exception)
{
if(consumer != null)
{
+ this.RemoveConsumer(consumer);
consumer.Close();
}
@@ -633,7 +633,26 @@
#endregion
- public void DoSend( Message message, MessageProducer producer, TimeSpan sendTimeout )
+ public void AddConsumer(MessageConsumer consumer)
+ {
+ if(!this.closing)
+ {
+ // Registered with Connection before we register at the broker.
+ consumers[consumer.ConsumerId] = consumer;
+ connection.addDispatcher(consumer.ConsumerId, this);
+ }
+ }
+
+ public void RemoveConsumer(MessageConsumer consumer)
+ {
+ connection.removeDispatcher(consumer.ConsumerId);
+ if(!this.closing)
+ {
+ consumers.Remove(consumer.ConsumerId);
+ }
+ }
+
+ public void DoSend(Message message, MessageProducer producer, TimeSpan sendTimeout)
{
Message msg = message;
@@ -699,52 +718,10 @@
}
}
- protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
- {
- ConsumerInfo answer = new ConsumerInfo();
- ConsumerId id = new ConsumerId();
- id.ConnectionId = info.SessionId.ConnectionId;
- id.SessionId = info.SessionId.Value;
- id.Value = Interlocked.Increment(ref consumerCounter);
- answer.ConsumerId = id;
- answer.Destination = Destination.Transform(destination);
- answer.Selector = selector;
- answer.Priority = this.Priority;
- answer.Exclusive = this.Exclusive;
- answer.DispatchAsync = this.DispatchAsync;
- answer.Retroactive = this.Retroactive;
- answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit;
- answer.AckMode = this.AcknowledgementMode;
-
- if(destination is ITopic || destination is ITemporaryTopic)
- {
- answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
- }
- else if(destination is IQueue || destination is ITemporaryQueue)
- {
- answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
- }
-
- // If the destination contained a URI query, then use it to set public properties
- // on the ConsumerInfo
- Destination amqDestination = destination as Destination;
- if(amqDestination != null && amqDestination.Options != null)
- {
- StringDictionary options = URISupport.GetProperties(amqDestination.Options, "consumer.");
- URISupport.SetProperties(answer, options);
- }
-
- return answer;
- }
-
protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
{
ProducerInfo answer = new ProducerInfo();
- ProducerId id = new ProducerId();
- id.ConnectionId = info.SessionId.ConnectionId;
- id.SessionId = info.SessionId.Value;
- id.Value = Interlocked.Increment(ref producerCounter);
- answer.ProducerId = id;
+ answer.ProducerId = GetNextProducerId();
answer.Destination = Destination.Transform(destination);
// If the destination contained a URI query, then use it to set public
@@ -759,7 +736,27 @@
return answer;
}
- public void Stop()
+ public ConsumerId GetNextConsumerId()
+ {
+ ConsumerId id = new ConsumerId();
+ id.ConnectionId = info.SessionId.ConnectionId;
+ id.SessionId = info.SessionId.Value;
+ id.Value = Interlocked.Increment(ref consumerCounter);
+
+ return id;
+ }
+
+ public ProducerId GetNextProducerId()
+ {
+ ProducerId id = new ProducerId();
+ id.ConnectionId = info.SessionId.ConnectionId;
+ id.SessionId = info.SessionId.Value;
+ id.Value = Interlocked.Increment(ref producerCounter);
+
+ return id;
+ }
+
+ public void Stop()
{
if(this.executor != null)
{
diff --git a/src/test/csharp/MessageConsumerTest.cs b/src/test/csharp/MessageConsumerTest.cs
new file mode 100644
index 0000000..1f62da5
--- /dev/null
+++ b/src/test/csharp/MessageConsumerTest.cs
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using Apache.NMS.Test;
+using NUnit.Framework;
+using Apache.NMS.Stomp.Commands;
+using System;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Test
+{
+ public enum ExpirationOptions
+ {
+ DEFAULT,
+ IGNORE,
+ DO_NOT_IGNORE
+ }
+
+ [TestFixture]
+ public class MessageConsumerTest : NMSTestSupport
+ {
+ protected static string DESTINATION_NAME = "queue://TEST.MessageConsumerTestDestination";
+ protected static string TEST_CLIENT_ID = "MessageConsumerTestClientId";
+
+ private CountDownLatch doneLatch;
+ private int counter;
+ private String errorMessage;
+
+ [SetUp]
+ public override void SetUp()
+ {
+ base.SetUp();
+
+ this.doneLatch = new CountDownLatch(1);
+ this.counter = 0;
+ this.errorMessage = null;
+ }
+
+ [Test]
+ public void TestBadSelectorDoesNotCloseConnection()
+ {
+ using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+ {
+ using(ISession sender = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ IDestination destination = sender.CreateTemporaryQueue();
+
+ IMessageProducer producer = sender.CreateProducer(destination);
+ ITextMessage goodMsg = sender.CreateTextMessage("testGood");
+ producer.Send(goodMsg);
+
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ connection.Start();
+ Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));
+
+ try
+ {
+ ISession badListenerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ badListenerSession.CreateConsumer(destination, "badSelector;too");
+ Assert.Fail("Exception expected.");
+ }
+ catch(Exception e)
+ {
+ Tracer.DebugFormat("Caught Ex: {0}", e);
+ }
+
+ ITextMessage failMsg = sender.CreateTextMessage("testFail");
+ producer.Send(failMsg);
+ Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));
+ }
+ }
+ }
+
+ [Test]
+ public void TestAsyncDispatchExceptionRedelivers()
+ {
+ using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+ {
+ using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME) as IQueue;
+
+ using(IMessageProducer producer = session.CreateProducer(queue))
+ {
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+ producer.Send(producer.CreateTextMessage("First"));
+ producer.Send(producer.CreateTextMessage("Second"));
+ }
+
+ using(IMessageConsumer consumer = session.CreateConsumer(queue))
+ {
+ consumer.Listener += OnTestAsynchRedliversMessage;
+
+ connection.Start();
+
+ if(doneLatch.await(TimeSpan.FromSeconds(10)))
+ {
+ if(!String.IsNullOrEmpty(errorMessage))
+ {
+ Assert.Fail(errorMessage);
+ }
+ }
+ else
+ {
+ Assert.Fail("Timeout waiting for async message delivery to complete.");
+ }
+ }
+ }
+ }
+ }
+
+ private void OnTestAsynchRedliversMessage(IMessage msg)
+ {
+ counter++;
+ try
+ {
+ ITextMessage message = msg as ITextMessage;
+ switch(counter)
+ {
+ case 1:
+ Tracer.Debug("Got first Message: " + message.Text);
+ Assert.AreEqual("First", message.Text);
+ Assert.IsFalse(message.NMSRedelivered);
+ break;
+ case 2:
+ Tracer.Debug("Got Second Message: " + message.Text);
+ Assert.AreEqual("Second", message.Text);
+ Assert.IsFalse(message.NMSRedelivered);
+ throw new Exception("Ignore Me");
+ case 3:
+ Tracer.Debug("Got Third Message: " + message.Text);
+ Assert.AreEqual("Second", message.Text);
+ Assert.IsTrue(message.NMSRedelivered);
+ doneLatch.countDown();
+ break;
+ default:
+ errorMessage = "Got too many messages: " + counter;
+ Tracer.Debug(errorMessage);
+ doneLatch.countDown();
+ break;
+ }
+ }
+ catch(Exception e)
+ {
+ if(e.Message.Equals("Ignore Me"))
+ {
+ throw;
+ }
+ errorMessage = "Got exception: " + e.Message;
+ Tracer.Warn("Exception on Message Receive: " + e.Message);
+ doneLatch.countDown();
+ }
+ }
+
+ [Test]
+ public void ConsumeInTwoThreads()
+ {
+ ParameterizedThreadStart threadStart =
+ delegate(object o)
+ {
+ IMessageConsumer consumer = (IMessageConsumer) o;
+ IMessage message = consumer.Receive(TimeSpan.FromSeconds(2));
+ Assert.IsNotNull(message);
+ };
+
+ using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+ {
+ connection.Start();
+ using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
+ {
+ IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME) as IQueue;
+
+ // enqueue 2 messages
+ using(IMessageConsumer consumer = session.CreateConsumer(queue))
+ using(IMessageProducer producer = session.CreateProducer(queue))
+ {
+ producer.DeliveryMode = MsgDeliveryMode.Persistent;
+ producer.Send(producer.CreateMessage());
+ producer.Send(producer.CreateMessage());
+ session.Commit();
+
+ // receive first using a dedicated thread. This works
+ Thread thread = new Thread(threadStart);
+ thread.Start(consumer);
+ thread.Join();
+ session.Commit();
+
+ // receive second using main thread. This FAILS
+ IMessage message = consumer.Receive(TimeSpan.FromSeconds(2)); // throws System.Threading.AbandonedMutexException
+ Assert.IsNotNull(message);
+ session.Commit();
+ }
+ }
+ }
+ }
+
+ [Test]
+ public void TestReceiveIgnoreExpirationMessage(
+ [Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+ AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+ AcknowledgementMode ackMode,
+ [Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
+ MsgDeliveryMode deliveryMode,
+ [Values(ExpirationOptions.DEFAULT, ExpirationOptions.IGNORE, ExpirationOptions.DO_NOT_IGNORE)]
+ ExpirationOptions expirationOption)
+ {
+ using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+ {
+ connection.Start();
+ using(Session session = connection.CreateSession(ackMode) as Session)
+ {
+ string destinationName = DESTINATION_NAME;
+
+ if(ExpirationOptions.IGNORE == expirationOption)
+ {
+ destinationName += "?consumer.nms.ignoreExpiration=true";
+ }
+ else if(ExpirationOptions.DO_NOT_IGNORE == expirationOption)
+ {
+ destinationName += "?consumer.nms.ignoreExpiration=false";
+ }
+
+ try
+ {
+ IDestination destination = SessionUtil.GetDestination(session, destinationName);
+
+ using(IMessageConsumer consumer = session.CreateConsumer(destination))
+ using(IMessageProducer producer = session.CreateProducer(destination))
+ {
+ producer.DeliveryMode = deliveryMode;
+
+ string msgText = string.Format("ExpiredMessage: {0}", Guid.NewGuid().ToString());
+
+ TextMessage msg = session.CreateTextMessage(msgText) as TextMessage;
+
+ // Give it two seconds to live.
+ msg.NMSTimeToLive = TimeSpan.FromMilliseconds(2000);
+
+ producer.Send(msg);
+
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+
+ // Wait for four seconds before processing it. The broker will have sent it to our local
+ // client dispatch queue, but we won't attempt to process the message until it has had
+ // a chance to expire within our internal queue system.
+ Thread.Sleep(4000);
+
+ TextMessage rcvMsg = consumer.ReceiveNoWait() as TextMessage;
+
+ if(ExpirationOptions.IGNORE == expirationOption)
+ {
+ Assert.IsNotNull(rcvMsg, "Did not receive expired message.");
+ rcvMsg.Acknowledge();
+
+ Assert.AreEqual(msgText, rcvMsg.Text, "Message text does not match.");
+ Assert.IsTrue(rcvMsg.IsExpired());
+
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+ }
+ else
+ {
+ // Should not receive a message.
+ Assert.IsNull(rcvMsg, "Received an expired message!");
+ }
+
+ consumer.Close();
+ producer.Close();
+ }
+ }
+ finally
+ {
+ try
+ {
+ // Ensure that Session resources on the Broker release transacted Consumers.
+ session.Close();
+ // Give the Broker some time to remove the subscriptions.
+ Thread.Sleep(2000);
+ SessionUtil.DeleteDestination(session, destinationName);
+ }
+ catch
+ {
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/vs2008-stomp-test.csproj b/vs2008-stomp-test.csproj
index d93579e..b3a6a96 100644
--- a/vs2008-stomp-test.csproj
+++ b/vs2008-stomp-test.csproj
@@ -68,6 +68,7 @@
<HintPath>lib\NUnit\net-2.0\nunit.framework.dll</HintPath>
</Reference>
<Reference Include="System" />
+ <Reference Include="System.Data" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
@@ -104,6 +105,7 @@
<ItemGroup>
<Compile Include="src\test\csharp\AMQNET383Test.cs" />
<Compile Include="src\test\csharp\InvalidCredentialsTest.cs" />
+ <Compile Include="src\test\csharp\MessageConsumerTest.cs" />
<Compile Include="src\test\csharp\StompHelperTest.cs" />
<Compile Include="src\test\csharp\StompRedeliveryPolicyTest.cs" />
<Compile Include="src\test\csharp\Threads\CompositeTaskRunnerTest.cs" />