blob: 1c182388d600ba8155586d7d871edc3b62efa40d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Diagnostics;
using System.Threading;
using Apache.NMS.Test;
using Apache.NMS.Util;
using NUnit.Framework;
namespace Apache.NMS.ActiveMQ.Test
{
[TestFixture]
public class MaxInactivityDurationTest : NMSTestSupport
{
protected static string DESTINATION_NAME = "TEST.MaxInactivityDuration";
protected static string CORRELATION_ID = "MaxInactivityCorrelationID";
[Test]
public void TestMaxInactivityDuration()
{
string testuri = "activemq:tcp://${activemqhost}:61616" +
"?wireFormat.maxInactivityDurationInitialDelay=5000" +
"&wireFormat.maxInactivityDuration=10000" +
"&connection.asyncClose=false";
NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(testuri));
using(IConnection connection = factory.CreateConnection("", ""))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
IDestination destination = SessionUtil.GetDestination(session, DESTINATION_NAME);
using(IMessageConsumer consumer = session.CreateConsumer(destination))
using(IMessageProducer producer = session.CreateProducer(destination))
{
SendMessage(producer);
IMessage receivedMsg = consumer.Receive(TimeSpan.FromSeconds(5));
Assert.AreEqual(CORRELATION_ID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
// Go inactive...
Thread.Sleep(TimeSpan.FromSeconds(30));
// Send another message.
SendMessage(producer);
receivedMsg = consumer.Receive(TimeSpan.FromSeconds(5));
Assert.AreEqual(CORRELATION_ID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
}
}
}
}
protected void SendMessage(IMessageProducer producer)
{
IMessage request = producer.CreateMessage();
request.NMSCorrelationID = CORRELATION_ID;
request.NMSType = "Test";
producer.Send(request);
}
[Test, Sequential]
public void TestInactivityMonitorThreadLeak(
[Values(0, 1000)]
int inactivityDuration)
{
Process currentProcess = Process.GetCurrentProcess();
Tracer.InfoFormat("Beginning thread count: {0}, handle count: {1}", currentProcess.Threads.Count, currentProcess.HandleCount);
string testuri = string.Format("activemq:tcp://${{activemqhost}}:61616?wireFormat.maxInactivityDuration={0}", inactivityDuration);
NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(testuri));
// We measure the initial resource counts, and then allow a certain fudge factor for the resources
// to fluctuate at run-time. We allow for a certain amount of fluctuation, but if the counts
// grow outside the safe boundaries of delayed garbage collection, then we fail the test.
currentProcess = Process.GetCurrentProcess();
int beginThreadCount = currentProcess.Threads.Count;
int beginHandleCount = currentProcess.HandleCount;
int maxThreadGrowth = 10;
int maxHandleGrowth = 500;
for(int i = 0; i < 200; i++)
{
using(IConnection connection = factory.CreateConnection("ResourceLeakTest", "Password"))
{
using(ISession session = connection.CreateSession())
{
IDestination destination = SessionUtil.GetDestination(session, "topic://TEST.NMSResourceLeak");
using(IMessageConsumer consumer = session.CreateConsumer(destination))
{
connection.Start();
}
}
}
currentProcess = Process.GetCurrentProcess();
int endThreadCount = currentProcess.Threads.Count;
int endHandleCount = currentProcess.HandleCount;
Assert.Less(endThreadCount, beginThreadCount + maxThreadGrowth, string.Format("Thread count grew beyond maximum of {0} on iteration #{1}.", maxThreadGrowth, i));
Assert.Less(endHandleCount, beginHandleCount + maxHandleGrowth, string.Format("Handle count grew beyond maximum of {0} on iteration #{1}.", maxHandleGrowth, i));
}
}
}
}