blob: 8afe8d31401cab28fed4f95d45620e0eb2228d66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* The main goal of this test list is to make sure that a producer and a consumer with
* separate connections can communicate properly and consitently despite a waiting time
* between messages TestStability500WithSleep would be the important test, as a delay of
* 30s is executed at each 100 (100, 200, 300, ...).
*/
using System.Threading;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.Test;
using Apache.NMS.Stomp;
using NUnit.Framework;
using System;
namespace Apache.NMS.Stomp.Test
{
[TestFixture]
public class NMSTester : NMSTestSupport
{
// TODO set proper configuration parameters
private const string destination = "test";
private static int numberOfMessages = 0;
private static IConnection producerConnection = null;
private static IConnection consumerConnection = null;
private static Thread consumerThread = null;
private static Thread producerThread = null;
private static long consumerMessageCounter = 0;
private static long producerMessageCounter = 0;
private static string possibleConsumerException = "";
private static string possibleProducerException = "";
private static bool consumerReady = false;
[SetUp]
public void Init()
{
if (producerConnection != null)
{
producerConnection.Close();
}
if (consumerConnection != null)
{
consumerConnection.Close();
}
if (consumerThread != null)
{
consumerThread.Abort();
}
if (producerThread != null)
{
producerThread.Abort();
}
producerConnection = null;
consumerConnection = null;
consumerThread = null;
producerThread = null;
producerConnection = CreateConnection();
consumerConnection = CreateConnection();
numberOfMessages = 0;
consumerMessageCounter = 0;
producerMessageCounter = 0;
possibleConsumerException = "";
possibleProducerException = "";
consumerReady = false;
//Giving time for the topic to clear out
Thread.Sleep(2500);
}
[TearDown]
public void Dispose()
{
if (producerConnection != null)
{
producerConnection.Close();
}
if (consumerConnection != null)
{
consumerConnection.Close();
}
if (consumerThread != null)
{
consumerThread.Abort();
}
if (producerThread != null)
{
producerThread.Abort();
}
producerConnection = null;
consumerConnection = null;
consumerThread = null;
producerThread = null;
}
[Test]
public void TestStability5Continuous()
{
numberOfMessages = 5;
consumerThread = new Thread(new ThreadStart(NMSTester.ConsumerThread));
consumerThread.Start();
producerThread = new Thread(new ThreadStart(NMSTester.ProducerThreadContinuous));
producerThread.Start();
Thread.Sleep(100);
Assert.IsTrue(consumerThread.IsAlive && producerThread.IsAlive);
while (consumerThread.IsAlive && producerThread.IsAlive)
{
Thread.Sleep(100);
}
Assert.AreEqual("", possibleConsumerException);
Assert.AreEqual("", possibleProducerException);
Assert.AreEqual(numberOfMessages, producerMessageCounter);
Assert.AreEqual(numberOfMessages, consumerMessageCounter);
}
[Test]
public void TestStability50Continuous()
{
numberOfMessages = 50;
consumerThread = new Thread(new ThreadStart(NMSTester.ConsumerThread));
consumerThread.Start();
producerThread = new Thread(new ThreadStart(NMSTester.ProducerThreadContinuous));
producerThread.Start();
Thread.Sleep(100);
Assert.IsTrue(consumerThread.IsAlive && producerThread.IsAlive);
while (consumerThread.IsAlive && producerThread.IsAlive)
{
Thread.Sleep(100);
}
Assert.AreEqual("", possibleConsumerException);
Assert.AreEqual("", possibleProducerException);
Assert.AreEqual(numberOfMessages, producerMessageCounter);
Assert.AreEqual(numberOfMessages, consumerMessageCounter);
}
[Test]
public void TestStability500Continuous()
{
numberOfMessages = 500; //At 100,200,300, ... a delay of 30 seconds is executed in the producer to cause an unexpected disconnect, due to a malformed ACK?
consumerThread = new Thread(new ThreadStart(NMSTester.ConsumerThread));
consumerThread.Start();
producerThread = new Thread(new ThreadStart(NMSTester.ProducerThreadContinuous));
producerThread.Start();
Thread.Sleep(100);
Assert.IsTrue(consumerThread.IsAlive && producerThread.IsAlive);
while (consumerThread.IsAlive && producerThread.IsAlive)
{
Thread.Sleep(100);
}
Assert.AreEqual("", possibleConsumerException);
Assert.AreEqual("", possibleProducerException);
Assert.AreEqual(numberOfMessages, producerMessageCounter);
Assert.AreEqual(numberOfMessages, consumerMessageCounter);
}
[Test]
public void TestStability500WithSleep()
{
numberOfMessages = 500; //At 100,200,300, ... a delay of 30 seconds is executed in the producer to cause an unexpected disconnect, due to a malformed ACK?
consumerThread = new Thread(new ThreadStart(NMSTester.ConsumerThread));
consumerThread.Start();
producerThread = new Thread(new ThreadStart(NMSTester.ProducerThreadWithSleep));
producerThread.Start();
Thread.Sleep(100);
Assert.IsTrue(consumerThread.IsAlive && producerThread.IsAlive);
while (consumerThread.IsAlive && producerThread.IsAlive)
{
Thread.Sleep(100);
}
Assert.AreEqual("", possibleConsumerException);
Assert.AreEqual("", possibleProducerException);
Assert.AreEqual(numberOfMessages, producerMessageCounter);
Assert.AreEqual(numberOfMessages, consumerMessageCounter);
}
public static void Main(string[] args)
{
}
#region Consumer
private static void ConsumerThread()
{
ISession session = consumerConnection.CreateSession();
IMessageConsumer consumer;
IDestination dest = session.GetTopic(destination);
consumer = session.CreateConsumer(dest);
consumer.Listener += new MessageListener(OnMessage);
consumerConnection.ExceptionListener += new ExceptionListener(OnConsumerExceptionListener);
consumerConnection.Start();
consumerReady = true;
while (true)
{
Thread.Sleep(100);
}
}
private static void OnMessage(IMessage receivedMsg)
{
consumerMessageCounter++;
}
public static void OnConsumerExceptionListener(Exception ex)
{
possibleConsumerException = ex.Message;
Thread.CurrentThread.Abort();
}
#endregion Consumer
#region Producer
private static void ProducerThreadWithSleep()
{
ISession session = producerConnection.CreateSession();
IMessageProducer producer;
IDestination dest = session.GetTopic(destination);
producer = session.CreateProducer(dest);
producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
producerConnection.ExceptionListener += new ExceptionListener(OnProducerExceptionListener);
producerConnection.Start();
ITextMessage message;
while (!consumerReady)
{
Thread.Sleep(100);
}
for (int c = 0; c < numberOfMessages; c++)
{
message = session.CreateTextMessage(c.ToString());
message.NMSType = "testType";
producer.Send(message);
producerMessageCounter++;
//Focal point of this test; induce a "long" delay between two messages without any other communication on the topic, Note that thse delays occure only at each 100, and that messages can be sent after the delay before a possible disconnect
if ((c + 1) % 100 == 0)
{
Thread.Sleep(30000);
}
else
{
Thread.Sleep(10);
}
}
}
private static void ProducerThreadContinuous()
{
ISession session = producerConnection.CreateSession();
IMessageProducer producer;
IDestination dest = session.GetTopic(destination);
producer = session.CreateProducer(dest);
producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
producerConnection.ExceptionListener += new ExceptionListener(OnProducerExceptionListener);
producerConnection.Start();
ITextMessage message;
while (!consumerReady)
{
Thread.Sleep(100);
}
for (int c = 0; c < numberOfMessages; c++)
{
message = session.CreateTextMessage(c.ToString());
message.NMSType = "testType";
producer.Send(message);
producerMessageCounter++;
Thread.Sleep(10);
}
}
public static void OnProducerExceptionListener(Exception ex)
{
possibleProducerException = ex.Message;
Thread.CurrentThread.Abort();
}
#endregion Producer
}
}