/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
using System; | |
using System.Threading; | |
using Apache.NMS.Util; | |
using NUnit.Framework; | |
namespace Apache.NMS.Test | |
{ | |
//[TestFixture] | |
public class DurableTest : NMSTest | |
{ | |
protected static string DURABLE_SELECTOR = "2 > 1"; | |
protected string TEST_CLIENT_AND_CONSUMER_ID; | |
protected string SEND_CLIENT_ID; | |
protected DurableTest(NMSTestSupport testSupport) | |
: base(testSupport) | |
{ | |
} | |
//[SetUp] | |
public override void SetUp() | |
{ | |
base.SetUp(); | |
TEST_CLIENT_AND_CONSUMER_ID = GetTestClientId(); | |
SEND_CLIENT_ID = GetTestClientId(); | |
} | |
//[Test] | |
public virtual void TestSendWhileClosed( | |
//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
// AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
AcknowledgementMode ackMode, string testTopicRef) | |
{ | |
try | |
{ | |
using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID)) | |
{ | |
connection.Start(); | |
using(ISession session = connection.CreateSession(ackMode)) | |
{ | |
ITopic topic = (ITopic)GetClearDestination(session, DestinationType.Topic, testTopicRef); | |
IMessageProducer producer = session.CreateProducer(topic); | |
producer.DeliveryMode = MsgDeliveryMode.Persistent; | |
ISession consumeSession = connection.CreateSession(ackMode); | |
IMessageConsumer consumer = consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false); | |
Thread.Sleep(1000); | |
consumer.Dispose(); | |
consumer = null; | |
ITextMessage message = session.CreateTextMessage("DurableTest-TestSendWhileClosed"); | |
message.Properties.SetString("test", "test"); | |
message.NMSType = "test"; | |
producer.Send(message); | |
if(AcknowledgementMode.Transactional == ackMode) | |
{ | |
session.Commit(); | |
} | |
Thread.Sleep(1000); | |
consumer = consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false); | |
ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage; | |
msg.Acknowledge(); | |
if(AcknowledgementMode.Transactional == ackMode) | |
{ | |
consumeSession.Commit(); | |
} | |
Assert.IsNotNull(msg); | |
Assert.AreEqual(msg.Text, "DurableTest-TestSendWhileClosed"); | |
Assert.AreEqual(msg.NMSType, "test"); | |
Assert.AreEqual(msg.Properties.GetString("test"), "test"); | |
} | |
} | |
} | |
catch(Exception ex) | |
{ | |
Assert.Fail(ex.Message); | |
} | |
finally | |
{ | |
// Pause to allow Stomp to unregister at the broker. | |
Thread.Sleep(500); | |
UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID); | |
} | |
} | |
//[Test] | |
public void TestDurableConsumerSelectorChange( | |
//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
// AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
AcknowledgementMode ackMode, string testTopicRef) | |
{ | |
try | |
{ | |
using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID)) | |
{ | |
connection.Start(); | |
using(ISession session = connection.CreateSession(ackMode)) | |
{ | |
ITopic topic = (ITopic)GetClearDestination(session, DestinationType.Topic, testTopicRef); | |
IMessageProducer producer = session.CreateProducer(topic); | |
IMessageConsumer consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, "color='red'", false); | |
producer.DeliveryMode = MsgDeliveryMode.Persistent; | |
// Send the messages | |
ITextMessage sendMessage = session.CreateTextMessage("1st"); | |
sendMessage.Properties["color"] = "red"; | |
producer.Send(sendMessage); | |
if(AcknowledgementMode.Transactional == ackMode) | |
{ | |
session.Commit(); | |
} | |
ITextMessage receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage; | |
Assert.IsNotNull(receiveMsg, "Failed to retrieve 1st durable message."); | |
Assert.AreEqual("1st", receiveMsg.Text); | |
Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match"); | |
receiveMsg.Acknowledge(); | |
if(AcknowledgementMode.Transactional == ackMode) | |
{ | |
session.Commit(); | |
} | |
// Change the subscription, allowing some time for the Broker to purge the | |
// consumers resources. | |
consumer.Dispose(); | |
Thread.Sleep(1000); | |
consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, "color='blue'", false); | |
sendMessage = session.CreateTextMessage("2nd"); | |
sendMessage.Properties["color"] = "red"; | |
producer.Send(sendMessage); | |
sendMessage = session.CreateTextMessage("3rd"); | |
sendMessage.Properties["color"] = "blue"; | |
producer.Send(sendMessage); | |
if(AcknowledgementMode.Transactional == ackMode) | |
{ | |
session.Commit(); | |
} | |
// Selector should skip the 2nd message. | |
receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage; | |
Assert.IsNotNull(receiveMsg, "Failed to retrieve durable message."); | |
Assert.AreEqual("3rd", receiveMsg.Text, "Retrieved the wrong durable message."); | |
Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match"); | |
receiveMsg.Acknowledge(); | |
if(AcknowledgementMode.Transactional == ackMode) | |
{ | |
session.Commit(); | |
} | |
// Make sure there are no pending messages. | |
Assert.IsNull(consumer.ReceiveNoWait(), "Wrong number of messages in durable subscription."); | |
} | |
} | |
} | |
catch(Exception ex) | |
{ | |
Assert.Fail(ex.Message); | |
} | |
finally | |
{ | |
// Pause to allow Stomp to unregister at the broker. | |
Thread.Sleep(500); | |
UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID); | |
} | |
} | |
//[Test] | |
public void TestDurableConsumer( | |
//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
// AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
AcknowledgementMode ackMode, string testDurableTopicName) | |
{ | |
try | |
{ | |
RegisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, testDurableTopicName, TEST_CLIENT_AND_CONSUMER_ID, null, false); | |
RunTestDurableConsumer(testDurableTopicName, ackMode); | |
if(AcknowledgementMode.Transactional == ackMode) | |
{ | |
RunTestDurableConsumer(testDurableTopicName, ackMode); | |
} | |
} | |
finally | |
{ | |
// Pause to allow Stomp to unregister at the broker. | |
Thread.Sleep(500); | |
UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID); | |
} | |
} | |
protected void RunTestDurableConsumer(string topicName, AcknowledgementMode ackMode) | |
{ | |
SendDurableMessage(topicName); | |
SendDurableMessage(topicName); | |
using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID)) | |
{ | |
connection.Start(); | |
using(ISession session = connection.CreateSession(ackMode)) | |
{ | |
ITopic topic = SessionUtil.GetTopic(session, topicName); | |
using(IMessageConsumer consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false)) | |
{ | |
IMessage msg = consumer.Receive(receiveTimeout); | |
Assert.IsNotNull(msg, "Did not receive first durable message."); | |
msg.Acknowledge(); | |
msg = consumer.Receive(receiveTimeout); | |
Assert.IsNotNull(msg, "Did not receive second durable message."); | |
msg.Acknowledge(); | |
if(AcknowledgementMode.Transactional == ackMode) | |
{ | |
session.Commit(); | |
} | |
} | |
} | |
} | |
} | |
protected void SendDurableMessage(string topicName) | |
{ | |
using(IConnection connection = CreateConnection(SEND_CLIENT_ID)) | |
{ | |
connection.Start(); | |
using(ISession session = connection.CreateSession()) | |
{ | |
ITopic topic = SessionUtil.GetTopic(session, topicName); | |
using(IMessageProducer producer = session.CreateProducer(topic)) | |
{ | |
ITextMessage message = session.CreateTextMessage("Durable Hello"); | |
producer.DeliveryMode = MsgDeliveryMode.Persistent; | |
producer.Send(message); | |
} | |
} | |
} | |
} | |
} | |
} |