blob: 413a965df9ba5cbce2e82991afab74f468b2ff1b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using Apache.NMS.Util;
using NUnit.Framework;
namespace Apache.NMS.Test
{
//[TestFixture]
//[Category("LongRunning")]
public class MessageSelectorTest : NMSTest
{
private int receivedNonIgnoredMsgCount = 0;
private int receivedIgnoredMsgCount = 0;
private bool simulateSlowConsumer = false;
protected MessageSelectorTest(NMSTestSupport testSupport)
: base(testSupport)
{
}
//[Test]
public virtual void TestFilterIgnoredMessages(
//[Values(SELECTOR_TEST_QUEUE, SELECTOR_TEST_TOPIC)]
string testDestRef)
{
simulateSlowConsumer = false;
RunFilterIgnoredMessagesTest(testDestRef);
}
/// <summary>
/// A slow consumer will trigger the producer flow control on the broker when the destination is
/// a queue. It will also trigger the consumer flow control by slowing down the feed to all of the
/// consumers on the queue to only send messages as fast as the slowest consumer can run.
/// When sending to a topic, the producer will not be slowed down, and consumers will be allowed
/// to run as fast as they can go.
/// Since this test can take a long time to run, it is marked as explicit.
/// </summary>
/// <param name="testDestRef"></param>
//[Test]
public virtual void TestFilterIgnoredMessagesSlowConsumer(
//[Values(SELECTOR_TEST_QUEUE, SELECTOR_TEST_TOPIC)]
string testDestRef)
{
simulateSlowConsumer = true;
RunFilterIgnoredMessagesTest(testDestRef);
}
public void RunFilterIgnoredMessagesTest(string testDestRef)
{
TimeSpan ttl = TimeSpan.FromMinutes(30);
const int MaxNumRequests = 100000;
using(IConnection connection1 = CreateConnection(GetTestClientId()))
using(IConnection connection2 = CreateConnection(GetTestClientId()))
using(IConnection connection3 = CreateConnection(GetTestClientId()))
{
connection1.Start();
connection2.Start();
connection3.Start();
using(ISession session1 = connection1.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(ISession session2 = connection2.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(ISession session3 = connection3.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
IDestination destination1 = GetClearDestinationByNodeReference(session1, testDestRef);
IDestination destination2 = GetClearDestinationByNodeReference(session2, testDestRef);
IDestination destination3 = GetClearDestinationByNodeReference(session3, testDestRef);
using(IMessageProducer producer = session1.CreateProducer(destination1))
using(IMessageConsumer consumer1 = session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'"))
{
int numNonIgnoredMsgsSent = 0;
int numIgnoredMsgsSent = 0;
producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
receivedNonIgnoredMsgCount = 0;
receivedIgnoredMsgCount = 0;
consumer1.Listener += new MessageListener(OnNonIgnoredMessage);
IMessageConsumer consumer2 = null;
for(int index = 1; index <= MaxNumRequests; index++)
{
IMessage request = session1.CreateTextMessage(String.Format("Hello World! [{0} of {1}]", index, MaxNumRequests));
request.NMSTimeToLive = ttl;
if(0 == (index % 2))
{
request.NMSType = "ACTIVE";
numNonIgnoredMsgsSent++;
}
else
{
request.NMSType = "ACTIVE.IGNORE";
numIgnoredMsgsSent++;
}
if(index % 200 == 0) Console.WriteLine("{0} Sending message {1}/{2}", DateTime.Now, index, MaxNumRequests);
producer.Send(request);
if(2000 == index)
{
// Start the second consumer
if(destination3.IsTopic)
{
// Reset the ignored message sent count, since all previous messages
// will not have been consumed on a topic.
numIgnoredMsgsSent = 0;
}
consumer2 = session3.CreateConsumer(destination3, "JMSType LIKE '%IGNORE'");
consumer2.Listener += new MessageListener(OnIgnoredMessage);
}
}
// Create a waiting loop that will coordinate the end of the test. It checks
// to see that all intended messages were received. It will continue to wait as
// long as new messages are being received. If it stops receiving messages before
// it receives everything it expects, it will eventually timeout and the test will fail.
int waitCount = 0;
int lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
int lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount;
while(receivedNonIgnoredMsgCount < numNonIgnoredMsgsSent
|| receivedIgnoredMsgCount < numIgnoredMsgsSent)
{
if(lastReceivedINongnoredMsgCount != receivedNonIgnoredMsgCount
|| lastReceivedIgnoredMsgCount != receivedIgnoredMsgCount)
{
// Reset the wait count.
waitCount = 0;
}
else
{
waitCount++;
}
lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount;
Assert.IsTrue(waitCount <= 30, String.Format("Timeout waiting for all messages to be delivered. Only {0} of {1} non-ignored messages delivered. Only {2} of {3} ignored messages delivered.",
receivedNonIgnoredMsgCount, numNonIgnoredMsgsSent, receivedIgnoredMsgCount, numIgnoredMsgsSent));
Thread.Sleep(1000);
}
consumer2.Dispose();
}
}
}
}
protected void OnNonIgnoredMessage(IMessage message)
{
receivedNonIgnoredMsgCount++;
Assert.AreEqual(message.NMSType, "ACTIVE");
if(receivedNonIgnoredMsgCount % 200 == 0) Console.WriteLine("{0} Received non ignored message {1}", DateTime.Now, receivedNonIgnoredMsgCount);
}
protected void OnIgnoredMessage(IMessage message)
{
receivedIgnoredMsgCount++;
Assert.AreEqual(message.NMSType, "ACTIVE.IGNORE");
if(simulateSlowConsumer)
{
// Simulate a slow consumer It doesn't have to be too slow in a high speed environment
// in order to trigger producer flow control.
Thread.Sleep(10);
}
if(receivedIgnoredMsgCount % 200 == 0) Console.WriteLine("{0} Received ignored message {1}", DateTime.Now, receivedIgnoredMsgCount);
}
//[Test]
public virtual void TestInvalidSelector(
//[Values(SELECTOR_TEST_QUEUE)]
string testDestRef)
{
using(IConnection connection = CreateConnection())
{
connection.Start();
using(ISession session = connection.CreateSession())
{
IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
string selector = "THIS IS NOT A VALID SELECTOR";
try
{
using(IMessageConsumer consumer = session.CreateConsumer(destination, selector))
{
Assert.Fail("Consumer should have thrown an NotSupportedException");
}
}
catch(InvalidSelectorException ex)
{
}
catch(Exception ex)
{
Assert.Fail("Wrong Exception Type Thrown: " + ex.GetType().Name);
}
}
}
}
}
}