blob: 4b740b7fb8975bacd0de0bb99de786c4f97f6381 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using Apache.NMS.Util;
using NUnit.Framework;
using NUnit.Framework.Extensions;
using System.Threading;
namespace Apache.NMS.Test
{
[TestFixture]
[Explicit]
public class MessageSelectorTest : NMSTestSupport
{
protected const string QUEUE_DESTINATION_NAME = "queue://MessageSelectorQueue";
protected const string TOPIC_DESTINATION_NAME = "topic://MessageSelectorTopic";
protected const string TEST_CLIENT_ID = "MessageSelectorClientId";
protected const string TEST_CLIENT_ID2 = "MessageSelectorClientId2";
protected const string TEST_CLIENT_ID3 = "MessageSelectorClientId3";
private int receivedNonIgnoredMsgCount = 0;
private int receivedIgnoredMsgCount = 0;
[RowTest]
[Row(MsgDeliveryMode.Persistent, QUEUE_DESTINATION_NAME)]
[Row(MsgDeliveryMode.NonPersistent, QUEUE_DESTINATION_NAME)]
[Row(MsgDeliveryMode.Persistent, TOPIC_DESTINATION_NAME)]
[Row(MsgDeliveryMode.NonPersistent, TOPIC_DESTINATION_NAME)]
public void FilterIgnoredMessagesTest(MsgDeliveryMode deliveryMode, string destinationName)
{
using(IConnection connection1 = CreateConnection(TEST_CLIENT_ID))
using(IConnection connection2 = CreateConnection(TEST_CLIENT_ID2))
using(IConnection connection3 = CreateConnection(TEST_CLIENT_ID3))
{
connection1.Start();
connection2.Start();
connection3.Start();
using(ISession session1 = connection1.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(ISession session2 = connection2.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(ISession session3 = connection3.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
IDestination destination1 = CreateDestination(session1, destinationName);
IDestination destination2 = SessionUtil.GetDestination(session2, destinationName);
using(IMessageProducer producer = session1.CreateProducer(destination1))
using(IMessageConsumer consumer1 = session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'"))
{
const int MaxNumRequests = 100000;
int numNonIgnoredMsgsSent = 0;
int numIgnoredMsgsSent = 0;
producer.DeliveryMode = deliveryMode;
// producer.RequestTimeout = receiveTimeout;
receivedNonIgnoredMsgCount = 0;
receivedIgnoredMsgCount = 0;
consumer1.Listener += new MessageListener(OnNonIgnoredMessage);
IMessageConsumer consumer2 = null;
for(int index = 1; index <= MaxNumRequests; index++)
{
IMessage request = session1.CreateTextMessage(String.Format("Hello World! [{0} of {1}]", index, MaxNumRequests));
// request.NMSTimeToLive = TimeSpan.FromSeconds(10);
if(0 == (index % 2))
{
request.NMSType = "ACTIVE";
numNonIgnoredMsgsSent++;
}
else
{
request.NMSType = "ACTIVE.IGNORE";
numIgnoredMsgsSent++;
}
producer.Send(request);
if(20000 == index)
{
// Start the second consumer
consumer2 = session3.CreateConsumer(destination2, "JMSType LIKE '%IGNORE'");
consumer2.Listener += new MessageListener(OnIgnoredMessage);
}
}
int waitCount = 0;
int lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
int lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount;
while(receivedNonIgnoredMsgCount < numNonIgnoredMsgsSent
|| receivedIgnoredMsgCount < numIgnoredMsgsSent)
{
if(lastReceivedINongnoredMsgCount != receivedNonIgnoredMsgCount
|| lastReceivedIgnoredMsgCount != receivedIgnoredMsgCount)
{
// Reset the wait count.
waitCount = 0;
Console.WriteLine("Reset the wait count while we are still receiving msgs.");
Thread.Sleep(2000);
continue;
}
lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount;
if(waitCount > 60)
{
Assert.Fail(String.Format("Timeout waiting for all messages to be delivered. Only {0} of {1} non-ignored messages delivered. Only {2} of {3} ignored messages delivered.",
receivedNonIgnoredMsgCount, numNonIgnoredMsgsSent, receivedIgnoredMsgCount, numIgnoredMsgsSent));
}
Console.WriteLine("Waiting to receive all non-ignored messages...");
Thread.Sleep(1000);
waitCount++;
}
consumer2.Dispose();
}
}
}
}
protected void OnNonIgnoredMessage(IMessage message)
{
receivedNonIgnoredMsgCount++;
Assert.AreEqual(message.NMSType, "ACTIVE");
}
protected void OnIgnoredMessage(IMessage message)
{
receivedIgnoredMsgCount++;
Assert.AreEqual(message.NMSType, "ACTIVE.IGNORE");
Thread.Sleep(100);
}
}
}