| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| using System; |
| using System.Threading; |
| using Apache.NMS.Util; |
| using NUnit.Framework; |
| |
| namespace Apache.NMS.Test |
| { |
| //[TestFixture] |
| public class DurableTest : NMSTest |
| { |
| protected static string DURABLE_SELECTOR = "2 > 1"; |
| |
| protected string TEST_CLIENT_AND_CONSUMER_ID; |
| protected string SEND_CLIENT_ID; |
| |
| protected DurableTest(NMSTestSupport testSupport) |
| : base(testSupport) |
| { |
| } |
| |
| //[SetUp] |
| public override void SetUp() |
| { |
| base.SetUp(); |
| |
| TEST_CLIENT_AND_CONSUMER_ID = GetTestClientId(); |
| SEND_CLIENT_ID = GetTestClientId(); |
| } |
| |
| //[Test] |
| public virtual void TestSendWhileClosed( |
| //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, |
| // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] |
| AcknowledgementMode ackMode, string testTopicRef) |
| { |
| try |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID)) |
| { |
| connection.Start(); |
| |
| using(ISession session = connection.CreateSession(ackMode)) |
| { |
| ITopic topic = (ITopic)GetClearDestinationByNodeReference(session, testTopicRef); |
| IMessageProducer producer = session.CreateProducer(topic); |
| |
| producer.DeliveryMode = MsgDeliveryMode.Persistent; |
| |
| ISession consumeSession = connection.CreateSession(ackMode); |
| IMessageConsumer consumer = consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false); |
| Thread.Sleep(1000); |
| consumer.Dispose(); |
| consumer = null; |
| |
| ITextMessage message = session.CreateTextMessage("DurableTest-TestSendWhileClosed"); |
| message.Properties.SetString("test", "test"); |
| message.NMSType = "test"; |
| producer.Send(message); |
| if(AcknowledgementMode.Transactional == ackMode) |
| { |
| session.Commit(); |
| } |
| |
| Thread.Sleep(1000); |
| consumer = consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false); |
| ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage; |
| msg.Acknowledge(); |
| if(AcknowledgementMode.Transactional == ackMode) |
| { |
| consumeSession.Commit(); |
| } |
| |
| Assert.IsNotNull(msg); |
| Assert.AreEqual(msg.Text, "DurableTest-TestSendWhileClosed"); |
| Assert.AreEqual(msg.NMSType, "test"); |
| Assert.AreEqual(msg.Properties.GetString("test"), "test"); |
| } |
| } |
| } |
| catch(Exception ex) |
| { |
| Assert.Fail(ex.Message); |
| } |
| finally |
| { |
| // Pause to allow Stomp to unregister at the broker. |
| Thread.Sleep(500); |
| |
| UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID); |
| } |
| } |
| |
| //[Test] |
| public void TestDurableConsumerSelectorChange( |
| //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, |
| // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] |
| AcknowledgementMode ackMode, string testTopicRef) |
| { |
| try |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID)) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession(ackMode)) |
| { |
| ITopic topic = (ITopic)GetClearDestinationByNodeReference(session, testTopicRef); |
| IMessageProducer producer = session.CreateProducer(topic); |
| IMessageConsumer consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, "color='red'", false); |
| |
| producer.DeliveryMode = MsgDeliveryMode.Persistent; |
| |
| // Send the messages |
| ITextMessage sendMessage = session.CreateTextMessage("1st"); |
| sendMessage.Properties["color"] = "red"; |
| producer.Send(sendMessage); |
| if(AcknowledgementMode.Transactional == ackMode) |
| { |
| session.Commit(); |
| } |
| |
| ITextMessage receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage; |
| Assert.IsNotNull(receiveMsg, "Failed to retrieve 1st durable message."); |
| Assert.AreEqual("1st", receiveMsg.Text); |
| Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match"); |
| receiveMsg.Acknowledge(); |
| if(AcknowledgementMode.Transactional == ackMode) |
| { |
| session.Commit(); |
| } |
| |
| // Change the subscription, allowing some time for the Broker to purge the |
| // consumers resources. |
| consumer.Dispose(); |
| Thread.Sleep(1000); |
| |
| consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, "color='blue'", false); |
| |
| sendMessage = session.CreateTextMessage("2nd"); |
| sendMessage.Properties["color"] = "red"; |
| producer.Send(sendMessage); |
| sendMessage = session.CreateTextMessage("3rd"); |
| sendMessage.Properties["color"] = "blue"; |
| producer.Send(sendMessage); |
| if(AcknowledgementMode.Transactional == ackMode) |
| { |
| session.Commit(); |
| } |
| |
| // Selector should skip the 2nd message. |
| receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage; |
| Assert.IsNotNull(receiveMsg, "Failed to retrieve durable message."); |
| Assert.AreEqual("3rd", receiveMsg.Text, "Retrieved the wrong durable message."); |
| Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match"); |
| receiveMsg.Acknowledge(); |
| if(AcknowledgementMode.Transactional == ackMode) |
| { |
| session.Commit(); |
| } |
| |
| // Make sure there are no pending messages. |
| Assert.IsNull(consumer.ReceiveNoWait(), "Wrong number of messages in durable subscription."); |
| } |
| } |
| } |
| catch(Exception ex) |
| { |
| Assert.Fail(ex.Message); |
| } |
| finally |
| { |
| // Pause to allow Stomp to unregister at the broker. |
| Thread.Sleep(500); |
| |
| UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID); |
| } |
| } |
| |
| //[Test] |
| public void TestDurableConsumer( |
| //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, |
| // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] |
| AcknowledgementMode ackMode, string testDurableTopicName) |
| { |
| try |
| { |
| RegisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, testDurableTopicName, TEST_CLIENT_AND_CONSUMER_ID, null, false); |
| RunTestDurableConsumer(testDurableTopicName, ackMode); |
| if(AcknowledgementMode.Transactional == ackMode) |
| { |
| RunTestDurableConsumer(testDurableTopicName, ackMode); |
| } |
| } |
| finally |
| { |
| // Pause to allow Stomp to unregister at the broker. |
| Thread.Sleep(500); |
| |
| UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID); |
| } |
| } |
| |
| protected void RunTestDurableConsumer(string topicName, AcknowledgementMode ackMode) |
| { |
| SendDurableMessage(topicName); |
| SendDurableMessage(topicName); |
| |
| using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID)) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession(ackMode)) |
| { |
| ITopic topic = SessionUtil.GetTopic(session, topicName); |
| using(IMessageConsumer consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false)) |
| { |
| IMessage msg = consumer.Receive(receiveTimeout); |
| Assert.IsNotNull(msg, "Did not receive first durable message."); |
| msg.Acknowledge(); |
| |
| msg = consumer.Receive(receiveTimeout); |
| Assert.IsNotNull(msg, "Did not receive second durable message."); |
| msg.Acknowledge(); |
| |
| if(AcknowledgementMode.Transactional == ackMode) |
| { |
| session.Commit(); |
| } |
| } |
| } |
| } |
| } |
| |
| protected void SendDurableMessage(string topicName) |
| { |
| using(IConnection connection = CreateConnection(SEND_CLIENT_ID)) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession()) |
| { |
| ITopic topic = SessionUtil.GetTopic(session, topicName); |
| using(IMessageProducer producer = session.CreateProducer(topic)) |
| { |
| ITextMessage message = session.CreateTextMessage("Durable Hello"); |
| |
| producer.DeliveryMode = MsgDeliveryMode.Persistent; |
| producer.Send(message); |
| } |
| } |
| } |
| } |
| } |
| } |