/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
using System; | |
using System.Threading; | |
using Apache.NMS.Util; | |
using NUnit.Framework; | |
namespace Apache.NMS.Test | |
{ | |
//[TestFixture] | |
//[Category("LongRunning")] | |
public class MessageSelectorTest : NMSTest | |
{ | |
private int receivedNonIgnoredMsgCount = 0; | |
private int receivedIgnoredMsgCount = 0; | |
private bool simulateSlowConsumer = false; | |
protected MessageSelectorTest(NMSTestSupport testSupport) | |
: base(testSupport) | |
{ | |
} | |
//[Test] | |
public virtual void TestFilterIgnoredMessages( | |
//[Values(SELECTOR_TEST_QUEUE_URI, SELECTOR_TEST_TOPIC_URI)] | |
string testDestinationURI) | |
{ | |
simulateSlowConsumer = false; | |
RunFilterIgnoredMessagesTest(testDestinationURI); | |
} | |
/// <summary> | |
/// A slow consumer will trigger the producer flow control on the broker when the destination is | |
/// a queue. It will also trigger the consumer flow control by slowing down the feed to all of the | |
/// consumers on the queue to only send messages as fast as the slowest consumer can run. | |
/// When sending to a topic, the producer will not be slowed down, and consumers will be allowed | |
/// to run as fast as they can go. | |
/// Since this test can take a long time to run, it is marked as explicit. | |
/// </summary> | |
/// <param name="testDestinationURI"></param> | |
//[Test] | |
public virtual void TestFilterIgnoredMessagesSlowConsumer( | |
//[Values(SELECTOR_TEST_QUEUE_URI, SELECTOR_TEST_TOPIC_URI)] | |
string testDestinationURI) | |
{ | |
simulateSlowConsumer = true; | |
RunFilterIgnoredMessagesTest(testDestinationURI); | |
} | |
public void RunFilterIgnoredMessagesTest(string testDestinationURI) | |
{ | |
TimeSpan ttl = TimeSpan.FromMinutes(30); | |
const int MaxNumRequests = 100000; | |
using(IConnection connection1 = CreateConnection(GetTestClientId())) | |
using(IConnection connection2 = CreateConnection(GetTestClientId())) | |
using(IConnection connection3 = CreateConnection(GetTestClientId())) | |
{ | |
connection1.Start(); | |
connection2.Start(); | |
connection3.Start(); | |
using(ISession session1 = connection1.CreateSession(AcknowledgementMode.AutoAcknowledge)) | |
using(ISession session2 = connection2.CreateSession(AcknowledgementMode.AutoAcknowledge)) | |
using(ISession session3 = connection3.CreateSession(AcknowledgementMode.AutoAcknowledge)) | |
{ | |
IDestination destination1 = GetClearDestination(session1, testDestinationURI); | |
IDestination destination2 = GetClearDestination(session2, testDestinationURI); | |
IDestination destination3 = GetClearDestination(session3, testDestinationURI); | |
using(IMessageProducer producer = session1.CreateProducer(destination1)) | |
using(IMessageConsumer consumer1 = session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'")) | |
{ | |
int numNonIgnoredMsgsSent = 0; | |
int numIgnoredMsgsSent = 0; | |
producer.DeliveryMode = MsgDeliveryMode.NonPersistent; | |
receivedNonIgnoredMsgCount = 0; | |
receivedIgnoredMsgCount = 0; | |
consumer1.Listener += new MessageListener(OnNonIgnoredMessage); | |
IMessageConsumer consumer2 = null; | |
for(int index = 1; index <= MaxNumRequests; index++) | |
{ | |
IMessage request = session1.CreateTextMessage(String.Format("Hello World! [{0} of {1}]", index, MaxNumRequests)); | |
request.NMSTimeToLive = ttl; | |
if(0 == (index % 2)) | |
{ | |
request.NMSType = "ACTIVE"; | |
numNonIgnoredMsgsSent++; | |
} | |
else | |
{ | |
request.NMSType = "ACTIVE.IGNORE"; | |
numIgnoredMsgsSent++; | |
} | |
producer.Send(request); | |
if(2000 == index) | |
{ | |
// Start the second consumer | |
if(destination3.IsTopic) | |
{ | |
// Reset the ignored message sent count, since all previous messages | |
// will not have been consumed on a topic. | |
numIgnoredMsgsSent = 0; | |
} | |
consumer2 = session3.CreateConsumer(destination3, "JMSType LIKE '%IGNORE'"); | |
consumer2.Listener += new MessageListener(OnIgnoredMessage); | |
} | |
} | |
// Create a waiting loop that will coordinate the end of the test. It checks | |
// to see that all intended messages were received. It will continue to wait as | |
// long as new messages are being received. If it stops receiving messages before | |
// it receives everything it expects, it will eventually timeout and the test will fail. | |
int waitCount = 0; | |
int lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount; | |
int lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount; | |
while(receivedNonIgnoredMsgCount < numNonIgnoredMsgsSent | |
|| receivedIgnoredMsgCount < numIgnoredMsgsSent) | |
{ | |
if(lastReceivedINongnoredMsgCount != receivedNonIgnoredMsgCount | |
|| lastReceivedIgnoredMsgCount != receivedIgnoredMsgCount) | |
{ | |
// Reset the wait count. | |
waitCount = 0; | |
} | |
else | |
{ | |
waitCount++; | |
} | |
lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount; | |
lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount; | |
Assert.IsTrue(waitCount <= 30, String.Format("Timeout waiting for all messages to be delivered. Only {0} of {1} non-ignored messages delivered. Only {2} of {3} ignored messages delivered.", | |
receivedNonIgnoredMsgCount, numNonIgnoredMsgsSent, receivedIgnoredMsgCount, numIgnoredMsgsSent)); | |
Thread.Sleep(1000); | |
} | |
consumer2.Dispose(); | |
} | |
} | |
} | |
} | |
protected void OnNonIgnoredMessage(IMessage message) | |
{ | |
receivedNonIgnoredMsgCount++; | |
Assert.AreEqual(message.NMSType, "ACTIVE"); | |
} | |
protected void OnIgnoredMessage(IMessage message) | |
{ | |
receivedIgnoredMsgCount++; | |
Assert.AreEqual(message.NMSType, "ACTIVE.IGNORE"); | |
if(simulateSlowConsumer) | |
{ | |
// Simulate a slow consumer It doesn't have to be too slow in a high speed environment | |
// in order to trigger producer flow control. | |
Thread.Sleep(10); | |
} | |
} | |
} | |
} |