Ensure the unacknowledged message are rolled back from the duplicate tracker on consumer close.
Fixes [AMQNET-AMQNET-506]. (See https://issues.apache.org/jira/browse/AMQNET-AMQNET-506)
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index 62e64d0..41df167 100755
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -507,7 +507,7 @@
this.session.Scheduler.Cancel(this.optimizedAckTask);
}
- if (this.session.IsClientAcknowledge)
+ if (this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge)
{
if (!this.info.Browser)
{
diff --git a/src/test/csharp/IndividualAckTest.cs b/src/test/csharp/IndividualAckTest.cs
index fe95593..a1c68fb 100644
--- a/src/test/csharp/IndividualAckTest.cs
+++ b/src/test/csharp/IndividualAckTest.cs
@@ -260,5 +260,62 @@
Assert.IsNull(msg);
session.Close();
}
+
+ [Test]
+ public void TestIndividualAcksWithClosedConsumerAndAuditSync()
+ {
+ const int MSG_COUNT = 20;
+ const string QUEUE_NAME = "TEST.TestIndividualAcksWithClosedConsumerAndAuditSync";
+
+ ProduceSomeMessages(MSG_COUNT, QUEUE_NAME);
+
+ string uri = "failover:(tcp://${activemqhost}:61616)";
+ IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
+
+ using (IConnection connection = factory.CreateConnection() as Connection)
+ using (ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge))
+ using (IQueue queue = session.GetQueue(QUEUE_NAME))
+ {
+ connection.Start();
+
+ // Consume all messages with no ACK
+ using (IMessageConsumer consumer = session.CreateConsumer(queue))
+ {
+ for (int i = 0; i < MSG_COUNT; ++i)
+ {
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(message);
+ Tracer.DebugFormat("Received message: {0}", message.NMSMessageId);
+ }
+ }
+
+ // Consumer the same batch again.
+ using (IMessageConsumer consumer = session.CreateConsumer(queue))
+ {
+ for (int i = 0; i < MSG_COUNT; ++i)
+ {
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(message);
+ Tracer.DebugFormat("Received message: {0}", message.NMSMessageId);
+ }
+ }
+
+ session.DeleteDestination(queue);
+ }
+ }
+
+ private void ProduceSomeMessages(int count, string queueName)
+ {
+ using (IConnection connection = CreateConnection())
+ using (ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge))
+ using (IQueue queue = session.GetQueue(queueName))
+ using (IMessageProducer producer = session.CreateProducer(queue))
+ {
+ for (int i = 0; i < count; ++i)
+ {
+ producer.Send(session.CreateMessage());
+ }
+ }
+ }
}
}