Made slow consumer test into an explicit test.
Fixes [AMQNET-273]. (See https://issues.apache.org/activemq/browse/AMQNET-273)
diff --git a/src/test/csharp/MessageSelectorTest.cs b/src/test/csharp/MessageSelectorTest.cs
index 3b3a6e4..fe0f3dc 100644
--- a/src/test/csharp/MessageSelectorTest.cs
+++ b/src/test/csharp/MessageSelectorTest.cs
@@ -33,12 +33,37 @@
private int receivedNonIgnoredMsgCount = 0;
private int receivedIgnoredMsgCount = 0;
+ private bool simulateSlowConsumer = false;
[Test]
public void FilterIgnoredMessagesTest(
[Values(QUEUE_DESTINATION_NAME, TOPIC_DESTINATION_NAME)]
string destinationName)
{
+ simulateSlowConsumer = false;
+ RunFilterIgnoredMessagesTest(destinationName);
+ }
+
+ /// <summary>
+ /// A slow consumer will trigger the producer flow control on the broker when the destination is
+ /// a queue. It will also trigger the consumer flow control by slowing down the feed to all of the
+ /// consumers on the queue to only send messages as fast as the slowest consumer can run.
+ /// When sending to a topic, the producer will not be slowed down, and consumers will be allowed
+ /// to run as fast as they can go.
+ /// Since this test can take a long time to run, it is marked as explicit.
+ /// </summary>
+ /// <param name="destinationName"></param>
+ [Test, Explicit]
+ public void FilterIgnoredMessagesSlowConsumerTest(
+ [Values(QUEUE_DESTINATION_NAME, TOPIC_DESTINATION_NAME)]
+ string destinationName)
+ {
+ simulateSlowConsumer = true;
+ RunFilterIgnoredMessagesTest(destinationName);
+ }
+
+ public void RunFilterIgnoredMessagesTest(string destinationName)
+ {
TimeSpan ttl = TimeSpan.FromMinutes(30);
const int MaxNumRequests = 100000;
@@ -55,7 +80,7 @@
{
IDestination destination1 = CreateDestination(session1, destinationName);
IDestination destination2 = SessionUtil.GetDestination(session2, destinationName);
- IDestination destination3 = SessionUtil.GetDestination(session3, destinationName); //jdg + "?consumer.prefetchSize=10000");
+ IDestination destination3 = SessionUtil.GetDestination(session3, destinationName);
using(IMessageProducer producer = session1.CreateProducer(destination1))
using(IMessageConsumer consumer1 = session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'"))
@@ -103,6 +128,10 @@
}
}
+ // Create a waiting loop that will coordinate the end of the test. It checks
+ // to see that all intended messages were received. It will continue to wait as
+ // long as new messages are being received. If it stops receiving messages before
+ // it receives everything it expects, it will eventually timeout and the test will fail.
int waitCount = 0;
int lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
int lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount;
@@ -126,7 +155,6 @@
Assert.IsTrue(waitCount <= 30, String.Format("Timeout waiting for all messages to be delivered. Only {0} of {1} non-ignored messages delivered. Only {2} of {3} ignored messages delivered.",
receivedNonIgnoredMsgCount, numNonIgnoredMsgsSent, receivedIgnoredMsgCount, numIgnoredMsgsSent));
- //Console.WriteLine("Waiting ({0}) to receive all non-ignored messages...", waitCount);
Thread.Sleep(1000);
}
@@ -146,8 +174,12 @@
{
receivedIgnoredMsgCount++;
Assert.AreEqual(message.NMSType, "ACTIVE.IGNORE");
- // Simulate a slow consumer
- //Thread.Sleep(10);
+ if(simulateSlowConsumer)
+ {
+ // Simulate a slow consumer It doesn't have to be too slow in a high speed environment
+ // in order to trigger producer flow control.
+ Thread.Sleep(10);
+ }
}
}
}