/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
using System;
using System.Threading;
using Apache.NMS.Util;
using NUnit.Framework;

namespace Apache.NMS.Test
{
	//[TestFixture]
	public class DurableTest : NMSTest
	{
		protected static string DURABLE_SELECTOR = "2 > 1";

		protected string TEST_CLIENT_AND_CONSUMER_ID;
		protected string SEND_CLIENT_ID;

		protected DurableTest(NMSTestSupport testSupport)
			: base(testSupport)
		{
		}

		//[SetUp]
		public override void SetUp()
		{
			base.SetUp();
			
			TEST_CLIENT_AND_CONSUMER_ID = GetTestClientId();
			SEND_CLIENT_ID = GetTestClientId();
		}

		//[Test]
		public virtual void TestSendWhileClosed(
			//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
			//	AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
			AcknowledgementMode ackMode, string testTopicRef)
		{
			try
			{				
		        using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
				{
			        connection.Start();

					using(ISession session = connection.CreateSession(ackMode))
					{
						ITopic topic = (ITopic)GetClearDestinationByNodeReference(session, testTopicRef);
						IMessageProducer producer = session.CreateProducer(topic);

						producer.DeliveryMode = MsgDeliveryMode.Persistent;
										
				        ISession consumeSession = connection.CreateSession(ackMode);
				        IMessageConsumer consumer = consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false);
				        Thread.Sleep(1000);
				        consumer.Dispose();
						consumer = null;
				        
						ITextMessage message = session.CreateTextMessage("DurableTest-TestSendWhileClosed");
				        message.Properties.SetString("test", "test");
				        message.NMSType = "test";
				        producer.Send(message);
						if(AcknowledgementMode.Transactional == ackMode)
						{
							session.Commit();
						}
										        
				        Thread.Sleep(1000);
						consumer = consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false);
				        ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
						msg.Acknowledge();
						if(AcknowledgementMode.Transactional == ackMode)
						{
							consumeSession.Commit();
						}
						
						Assert.IsNotNull(msg);
				        Assert.AreEqual(msg.Text, "DurableTest-TestSendWhileClosed");
				        Assert.AreEqual(msg.NMSType, "test");
				        Assert.AreEqual(msg.Properties.GetString("test"), "test");
					}
				}
			}
			catch(Exception ex)
			{
				Assert.Fail(ex.Message);
			}
			finally
			{
                // Pause to allow Stomp to unregister at the broker.
                Thread.Sleep(500);

				UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID);
			}			
	    }		
		
		//[Test]
		public void TestDurableConsumerSelectorChange(
			//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
			//	AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
			AcknowledgementMode ackMode, string testTopicRef)
		{
			try
			{
				using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
				{
					connection.Start();
					using(ISession session = connection.CreateSession(ackMode))
					{
						ITopic topic = (ITopic)GetClearDestinationByNodeReference(session, testTopicRef);
						IMessageProducer producer = session.CreateProducer(topic);
						IMessageConsumer consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, "color='red'", false);

						producer.DeliveryMode = MsgDeliveryMode.Persistent;

						// Send the messages
						ITextMessage sendMessage = session.CreateTextMessage("1st");
						sendMessage.Properties["color"] = "red";
						producer.Send(sendMessage);
						if(AcknowledgementMode.Transactional == ackMode)
						{
							session.Commit();
						}

						ITextMessage receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
						Assert.IsNotNull(receiveMsg, "Failed to retrieve 1st durable message.");
						Assert.AreEqual("1st", receiveMsg.Text);
						Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
						receiveMsg.Acknowledge();
						if(AcknowledgementMode.Transactional == ackMode)
						{
							session.Commit();
						}

						// Change the subscription, allowing some time for the Broker to purge the
						// consumers resources.
						consumer.Dispose();
                        Thread.Sleep(1000);
						
						consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, "color='blue'", false);

						sendMessage = session.CreateTextMessage("2nd");
						sendMessage.Properties["color"] = "red";
						producer.Send(sendMessage);
						sendMessage = session.CreateTextMessage("3rd");
						sendMessage.Properties["color"] = "blue";
						producer.Send(sendMessage);
						if(AcknowledgementMode.Transactional == ackMode)
						{
							session.Commit();
						}

						// Selector should skip the 2nd message.
						receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
						Assert.IsNotNull(receiveMsg, "Failed to retrieve durable message.");
						Assert.AreEqual("3rd", receiveMsg.Text, "Retrieved the wrong durable message.");
						Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
						receiveMsg.Acknowledge();
						if(AcknowledgementMode.Transactional == ackMode)
						{
							session.Commit();
						}

						// Make sure there are no pending messages.
						Assert.IsNull(consumer.ReceiveNoWait(), "Wrong number of messages in durable subscription.");
					}
				}
			}
			catch(Exception ex)
			{
				Assert.Fail(ex.Message);
			}
			finally
			{
                // Pause to allow Stomp to unregister at the broker.
                Thread.Sleep(500);

				UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID);
			}
		}

		//[Test]
		public void TestDurableConsumer(
			//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
			//	AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
			AcknowledgementMode ackMode, string testDurableTopicName)
		{
			try
			{
				RegisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, testDurableTopicName, TEST_CLIENT_AND_CONSUMER_ID, null, false);
				RunTestDurableConsumer(testDurableTopicName, ackMode);
				if(AcknowledgementMode.Transactional == ackMode)
				{
					RunTestDurableConsumer(testDurableTopicName, ackMode);
				}
			}
			finally
			{
                // Pause to allow Stomp to unregister at the broker.
                Thread.Sleep(500);
				
				UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID);
			}
		}

		protected void RunTestDurableConsumer(string topicName, AcknowledgementMode ackMode)
		{
			SendDurableMessage(topicName);
			SendDurableMessage(topicName);

			using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
			{
				connection.Start();
				using(ISession session = connection.CreateSession(ackMode))
				{
					ITopic topic = SessionUtil.GetTopic(session, topicName);
					using(IMessageConsumer consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false))
					{
						IMessage msg = consumer.Receive(receiveTimeout);
						Assert.IsNotNull(msg, "Did not receive first durable message.");
						msg.Acknowledge();

						msg = consumer.Receive(receiveTimeout);
						Assert.IsNotNull(msg, "Did not receive second durable message.");
						msg.Acknowledge();

						if(AcknowledgementMode.Transactional == ackMode)
						{
							session.Commit();
						}
					}
				}
			}
		}

		protected void SendDurableMessage(string topicName)
		{
			using(IConnection connection = CreateConnection(SEND_CLIENT_ID))
			{
				connection.Start();
				using(ISession session = connection.CreateSession())
				{
					ITopic topic = SessionUtil.GetTopic(session, topicName);
					using(IMessageProducer producer = session.CreateProducer(topic))
					{
						ITextMessage message = session.CreateTextMessage("Durable Hello");

						producer.DeliveryMode = MsgDeliveryMode.Persistent;
						producer.Send(message);
					}
				}
			}
		}
	}
}
