blob: 8159610e0f21dc7990e12a485f36ec61cb301e73 [file] [log] [blame]
// /*
// * Licensed to the Apache Software Foundation (ASF) under one or more
// * contributor license agreements. See the NOTICE file distributed with
// * this work for additional information regarding copyright ownership.
// * The ASF licenses this file to You under the Apache License, Version 2.0
// * (the "License"); you may not use this file except in compliance with
// * the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
using System;
using System.Collections.Generic;
using System.Threading;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS.ActiveMQ.Transport.Mock;
using NUnit.Framework;
namespace Apache.NMS.ActiveMQ.Test
{
[TestFixture]
public class InactivityMonitorTest
{
private List<Command> received;
private List<Exception> exceptions;
private MockTransport transport = null;
private WireFormatInfo localWireFormatInfo = null;
private CountDownLatch asyncErrorLatch;
public void OnException(ITransport transport, Exception exception)
{
Tracer.Debug("Test: Received Exception from Transport: " + exception );
exceptions.Add( exception );
asyncErrorLatch.countDown();
}
public void OnCommand(ITransport transport, Command command)
{
Tracer.Debug("Test: Received Command from Transport: " + command );
received.Add( command );
}
[SetUp]
public void SetUp()
{
this.received = new List<Command>();
this.exceptions = new List<Exception>();
Uri uri = new Uri("mock://mock?wireformat=openwire");
MockTransportFactory factory = new MockTransportFactory();
this.transport = factory.CompositeConnect( uri ) as MockTransport;
this.localWireFormatInfo = new WireFormatInfo();
this.localWireFormatInfo.Version = 5;
this.localWireFormatInfo.MaxInactivityDuration = 3000;
this.localWireFormatInfo.TightEncodingEnabled = false;
this.asyncErrorLatch = new CountDownLatch(1);
}
[Test]
public void TestCreate()
{
InactivityMonitor monitor = new InactivityMonitor( this.transport );
Assert.IsTrue( monitor.InitialDelayTime == 0 );
Assert.IsTrue( monitor.ReadCheckTime == 0 );
Assert.IsTrue( monitor.WriteCheckTime == 0 );
Assert.IsTrue( monitor.KeepAliveResponseRequired == false );
Assert.IsTrue( monitor.IsDisposed == false );
}
[Test]
public void TestReadTimeout()
{
InactivityMonitor monitor = new InactivityMonitor( this.transport );
monitor.Exception += new ExceptionHandler(OnException);
monitor.Command += new CommandHandler(OnCommand);
// Send the local one for the monitor to record.
monitor.Oneway( this.localWireFormatInfo );
// Should not have timed out on Read yet.
Assert.IsTrue( this.exceptions.Count == 0 );
this.asyncErrorLatch.await(TimeSpan.FromSeconds(10));
// Channel should have been inactive for to long.
Assert.IsTrue( this.exceptions.Count > 0 );
}
[Test]
public void TestWriteMessageFail()
{
this.transport.FailOnKeepAliveInfoSends = true ;
this.transport.NumSentKeepAliveInfosBeforeFail = 4;
InactivityMonitor monitor = new InactivityMonitor( this.transport );
monitor.Exception += new ExceptionHandler(OnException);
monitor.Command += new CommandHandler(OnCommand);
monitor.Start();
// Send the local one for the monitor to record.
monitor.Oneway( this.localWireFormatInfo );
this.asyncErrorLatch.await(TimeSpan.FromSeconds(10));
// Channel should have been inactive for to long.
Assert.IsTrue( this.exceptions.Count > 0 );
try
{
monitor.Oneway(new ActiveMQMessage());
Assert.Fail("Should have thrown an exception");
}
catch
{
}
}
[Test]
public void TestNonFailureSendCase()
{
InactivityMonitor monitor = new InactivityMonitor( this.transport );
monitor.Exception += new ExceptionHandler(OnException);
monitor.Command += new CommandHandler(OnCommand);
monitor.Start();
// Send the local one for the monitor to record.
monitor.Oneway( this.localWireFormatInfo );
ActiveMQMessage message = new ActiveMQMessage();
for( int ix = 0; ix < 20; ++ix )
{
monitor.Oneway( message );
Thread.Sleep( 500 );
this.transport.InjectCommand( message );
Thread.Sleep( 500 );
}
// Channel should have been inactive for to long.
Assert.IsTrue( this.exceptions.Count == 0 );
}
}
}