using System; | |
using System.Threading; | |
using Amqp.Framing; | |
using Apache.NMS; | |
using NMS.AMQP.Test.TestAmqp; | |
using NUnit.Framework; | |
namespace NMS.AMQP.Test.Integration | |
{ | |
[TestFixture] | |
public class MessageExpirationIntegrationTest : IntegrationTestFixture | |
{ | |
[Test, Timeout(20_000)] | |
public void TestIncomingExpiredMessageGetsFiltered() | |
{ | |
using (TestAmqpPeer testPeer = new TestAmqpPeer()) | |
{ | |
IConnection connection = EstablishConnection(testPeer); | |
connection.Start(); | |
testPeer.ExpectBegin(); | |
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); | |
IQueue queue = session.GetQueue("myQueue"); | |
// Expected the consumer to attach and send credit, then send it an | |
// already-expired message followed by a live message. | |
testPeer.ExpectReceiverAttach(); | |
string expiredMsgContent = "already-expired"; | |
Amqp.Message message = CreateExpiredMessage(expiredMsgContent); | |
testPeer.ExpectLinkFlowRespondWithTransfer(message: message); | |
string liveMsgContent = "valid"; | |
testPeer.SendTransferToLastOpenedLinkOnLastOpenedSession(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = liveMsgContent } }, nextIncomingId: 2); | |
IMessageConsumer consumer = session.CreateConsumer(queue); | |
// Call receive, expect the first message to be filtered due to expiry, | |
// and the second message to be given to the test app and accepted. | |
Action<DeliveryState> modifiedMatcher = state => | |
{ | |
var modified = state as Modified; | |
Assert.IsNotNull(modified); | |
Assert.IsTrue(modified.DeliveryFailed); | |
Assert.IsTrue(modified.UndeliverableHere); | |
}; | |
testPeer.ExpectDisposition(settled: true, stateMatcher: modifiedMatcher, firstDeliveryId: 1, lastDeliveryId: 1); | |
testPeer.ExpectDisposition(settled: true, stateMatcher: Assert.IsInstanceOf<Accepted>, firstDeliveryId: 2, lastDeliveryId: 2); | |
IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); | |
Assert.NotNull(m, "Message should have been received"); | |
Assert.IsInstanceOf<ITextMessage>(m); | |
Assert.AreEqual(liveMsgContent, (m as ITextMessage).Text, "Unexpected message content"); | |
// Verify the other message is not there. Will drain to be sure there are no messages. | |
Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(10)), "Message should not have been received"); | |
testPeer.ExpectClose(); | |
connection.Close(); | |
testPeer.WaitForAllMatchersToComplete(3000); | |
} | |
} | |
[Test, Timeout(20_000)] | |
public void TestIncomingExpiredMessageGetsConsumedWhenFilterDisabled() | |
{ | |
using (TestAmqpPeer testPeer = new TestAmqpPeer()) | |
{ | |
IConnection connection = EstablishConnection(testPeer, "?nms.localMessageExpiry=false"); | |
connection.Start(); | |
testPeer.ExpectBegin(); | |
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); | |
IQueue queue = session.GetQueue("myQueue"); | |
// Expected the consumer to attach and send credit, then send it an | |
// already-expired message followed by a live message. | |
testPeer.ExpectReceiverAttach(); | |
string expiredMsgContent = "already-expired"; | |
Amqp.Message message = CreateExpiredMessage(expiredMsgContent); | |
testPeer.ExpectLinkFlowRespondWithTransfer(message: message); | |
string liveMsgContent = "valid"; | |
testPeer.SendTransferToLastOpenedLinkOnLastOpenedSession(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = liveMsgContent } }, nextIncomingId: 2); | |
IMessageConsumer consumer = session.CreateConsumer(queue); | |
// Call receive, expect the expired message as we disabled local expiry. | |
testPeer.ExpectDisposition(settled: true, stateMatcher: Assert.IsInstanceOf<Accepted>, firstDeliveryId: 1, lastDeliveryId: 1); | |
IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); | |
Assert.NotNull(m, "Message should have been received"); | |
Assert.IsInstanceOf<ITextMessage>(m); | |
Assert.AreEqual(expiredMsgContent, ((ITextMessage) m).Text, "Unexpected message content"); | |
// Verify the other message is there | |
testPeer.ExpectDisposition(settled: true, stateMatcher: Assert.IsInstanceOf<Accepted>, firstDeliveryId: 2, lastDeliveryId: 2); | |
m = consumer.Receive(TimeSpan.FromMilliseconds(3000)); | |
Assert.NotNull(m, "Message should have been received"); | |
Assert.IsInstanceOf<ITextMessage>(m); | |
Assert.AreEqual(liveMsgContent, ((ITextMessage) m).Text, "Unexpected message content"); | |
testPeer.ExpectClose(); | |
connection.Close(); | |
testPeer.WaitForAllMatchersToComplete(3000); | |
} | |
} | |
[Test, Timeout(20_000)] | |
public void TestIncomingExpiredMessageGetsFilteredAsync() | |
{ | |
using (TestAmqpPeer testPeer = new TestAmqpPeer()) | |
{ | |
IConnection connection = EstablishConnection(testPeer); | |
connection.Start(); | |
testPeer.ExpectBegin(); | |
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); | |
IQueue queue = session.GetQueue("myQueue"); | |
// Expected the consumer to attach and send credit, then send it an | |
// already-expired message followed by a live message. | |
testPeer.ExpectReceiverAttach(); | |
string expiredMsgContent = "already-expired"; | |
Amqp.Message message = CreateExpiredMessage(expiredMsgContent); | |
testPeer.ExpectLinkFlowRespondWithTransfer(message: message); | |
string liveMsgContent = "valid"; | |
testPeer.SendTransferToLastOpenedLinkOnLastOpenedSession(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = liveMsgContent } }, nextIncomingId: 2); | |
IMessageConsumer consumer = session.CreateConsumer(queue); | |
// Add message listener, expect the first message to be filtered due to expiry, | |
// and the second message to be given to the test app and accepted. | |
Action<DeliveryState> modifiedMatcher = state => | |
{ | |
var modified = state as Modified; | |
Assert.IsNotNull(modified); | |
Assert.IsTrue(modified.DeliveryFailed); | |
Assert.IsTrue(modified.UndeliverableHere); | |
}; | |
testPeer.ExpectDisposition(settled: true, stateMatcher: modifiedMatcher, firstDeliveryId: 1, lastDeliveryId: 1); | |
testPeer.ExpectDisposition(settled: true, stateMatcher: Assert.IsInstanceOf<Accepted>, firstDeliveryId: 2, lastDeliveryId: 2); | |
ManualResetEvent success = new ManualResetEvent(false); | |
ManualResetEvent listenerFailure = new ManualResetEvent(false); | |
consumer.Listener += m => | |
{ | |
if (liveMsgContent.Equals(((ITextMessage) m).Text)) | |
success.Set(); | |
else | |
listenerFailure.Set(); | |
}; | |
Assert.True(success.WaitOne(TimeSpan.FromSeconds(5)), "didn't get expected message"); | |
Assert.False(listenerFailure.WaitOne(TimeSpan.FromMilliseconds(100)), "Received message when message should not have been received"); | |
testPeer.WaitForAllMatchersToComplete(3000); | |
testPeer.ExpectClose(); | |
connection.Close(); | |
testPeer.WaitForAllMatchersToComplete(3000); | |
} | |
} | |
[Test, Timeout(20_000)] | |
public void TestIncomingExpiredMessageGetsConsumedWhenFilterDisabledAsync() | |
{ | |
using (TestAmqpPeer testPeer = new TestAmqpPeer()) | |
{ | |
IConnection connection = EstablishConnection(testPeer, "?nms.localMessageExpiry=false"); | |
connection.Start(); | |
testPeer.ExpectBegin(); | |
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); | |
IQueue queue = session.GetQueue("myQueue"); | |
// Expected the consumer to attach and send credit, then send it an | |
// already-expired message followed by a live message. | |
testPeer.ExpectReceiverAttach(); | |
string expiredMsgContent = "already-expired"; | |
Amqp.Message message = CreateExpiredMessage(expiredMsgContent); | |
testPeer.ExpectLinkFlowRespondWithTransfer(message: message); | |
string liveMsgContent = "valid"; | |
testPeer.SendTransferToLastOpenedLinkOnLastOpenedSession(message: new Amqp.Message() { BodySection = new AmqpValue() { Value = liveMsgContent } }, nextIncomingId: 2); | |
IMessageConsumer consumer = session.CreateConsumer(queue); | |
// Add message listener, expect both messages as the filter is disabled | |
testPeer.ExpectDisposition(settled: true, stateMatcher: Assert.IsInstanceOf<Accepted>, firstDeliveryId:1, lastDeliveryId:1); | |
testPeer.ExpectDisposition(settled: true, stateMatcher: Assert.IsInstanceOf<Accepted>, firstDeliveryId:2, lastDeliveryId:2); | |
CountdownEvent success = new CountdownEvent(2); | |
consumer.Listener += m => | |
{ | |
if (expiredMsgContent.Equals(((ITextMessage) m).Text) || liveMsgContent.Equals(((ITextMessage) m).Text)) | |
success.Signal(); | |
}; | |
Assert.IsTrue(success.Wait(TimeSpan.FromSeconds(5)), "Didn't get expected messages"); | |
testPeer.WaitForAllMatchersToComplete(3000); | |
testPeer.ExpectClose(); | |
connection.Close(); | |
testPeer.WaitForAllMatchersToComplete(3000); | |
} | |
} | |
private static Amqp.Message CreateExpiredMessage(string value) | |
{ | |
return new Amqp.Message | |
{ | |
BodySection = new AmqpValue() { Value = value }, | |
Properties = new Properties { AbsoluteExpiryTime = DateTime.UtcNow - TimeSpan.FromMilliseconds(100) } | |
}; | |
} | |
} | |
} |