blob: e7fe9ee2bb2cf1c9179970aff5653bf5920d93dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Threading;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.Test;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS.ActiveMQ.Transport.Failover;
using Apache.NMS.ActiveMQ.Transport.Tcp;
using Apache.NMS.ActiveMQ.Transport.Mock;
using NUnit.Framework;
namespace Apache.NMS.ActiveMQ.Test
{
[TestFixture]
public class FailoverTransportTest
{
private List<Command> sent;
private List<Command> received;
private List<Exception> exceptions;
private const int MAX_ATTEMPTS = 30;
private const int MESSAGE_COUNT = 5;
private Connection connection;
private int msgCount = 5;
private bool interrupted = false;
private bool resumed = false;
protected AutoResetEvent semaphore = new AutoResetEvent(false);
int sessionIdx = 1;
int consumerIdx = 1;
int producerIdx = 1;
private void OnException(ITransport transport, Exception exception)
{
Tracer.Debug("Test: Received Exception from Transport: " + exception);
exceptions.Add(exception);
}
private void OnCommand(ITransport transport, Command command)
{
Tracer.DebugFormat("Test: Received Command from Transport: {0}", command);
received.Add(command);
}
private void OnOutgoingCommand(ITransport transport, Command command)
{
Tracer.DebugFormat("FailoverTransportTest::OnOutgoingCommand - {0}", command);
sent.Add(command);
}
private void OnResumed(ITransport sender)
{
Tracer.DebugFormat("FailoverTransportTest::OnResumed - {0}", sender.RemoteAddress);
// Ensure the current mock transport has the correct outgoing command handler
MockTransport mock = sender as MockTransport;
Assert.IsNotNull(mock);
mock.OutgoingCommand = OnOutgoingCommand;
}
private void OnInterrupted(ITransport sender)
{
}
private void VerifyCommandHandlerSetting(ITransport transport, MockTransport mock)
{
// Walk the stack of wrapper transports.
ITransport failoverTransportTarget = mock.Command.Target as ITransport;
Assert.IsNotNull(failoverTransportTarget);
ITransport mutexTransportTarget = failoverTransportTarget.Command.Target as ITransport;
Assert.IsNotNull(mutexTransportTarget);
ITransport responseCorrelatorTransportTarget = mutexTransportTarget.Command.Target as ITransport;
Assert.IsNotNull(responseCorrelatorTransportTarget);
Assert.AreEqual(transport.Command.Target, responseCorrelatorTransportTarget.Command.Target);
}
[SetUp]
public void init()
{
sent = new List<Command>();
received = new List<Command>();
exceptions = new List<Exception>();
sessionIdx = 1;
consumerIdx = 1;
producerIdx = 1;
this.connection = null;
this.msgCount = MESSAGE_COUNT;
this.interrupted = false;
this.resumed = false;
}
[Test]
public void FailoverTransportCreateTest()
{
Uri uri = new Uri("failover:(mock://localhost:61616)?transport.randomize=false");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover, "Failed to create Transport");
Assert.IsFalse(failover.Randomize, "Failed to properly set Randomize flag");
transport.Start();
Thread.Sleep(2000);
Assert.IsTrue(failover.IsConnected, "Transport should be connected");
}
}
[Test]
public void FailoverTransportWithBackupsTest()
{
Uri uri = new Uri("failover:(mock://localhost:61616,mock://localhost:61618)?transport.randomize=false&transport.backup=true");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.Randomize);
Assert.IsTrue(failover.Backup);
transport.Start();
Thread.Sleep(1000);
Assert.IsTrue(failover.IsConnected);
}
}
[Test]
public void FailoverTransportCreateFailOnCreateTest()
{
Uri uri = new Uri("failover:(mock://localhost:61616?transport.failOnCreate=true)?" +
"transport.useExponentialBackOff=false&transport.maxReconnectAttempts=3&transport.initialReconnectDelay=100");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.UseExponentialBackOff);
Assert.AreEqual(3, failover.MaxReconnectAttempts);
Assert.AreEqual(100, failover.InitialReconnectDelay);
transport.Start();
Thread.Sleep(2000);
Assert.IsNotEmpty(this.exceptions);
Assert.IsFalse(failover.IsConnected);
}
}
[Test]
public void FailoverTransportCreateFailOnCreateTest2()
{
Uri uri = new Uri("failover:(mock://localhost:61616?transport.failOnCreate=true)?" +
"transport.useExponentialBackOff=false&transport.startupMaxReconnectAttempts=3&transport.initialReconnectDelay=100");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.UseExponentialBackOff);
Assert.AreEqual(3, failover.StartupMaxReconnectAttempts);
Assert.AreEqual(100, failover.InitialReconnectDelay);
transport.Start();
Thread.Sleep(2000);
Assert.IsNotEmpty(this.exceptions);
Assert.IsFalse(failover.IsConnected);
}
}
[Test]
public void FailoverTransportFailOnSendMessageTest()
{
Uri uri = new Uri("failover:(mock://localhost:61616?transport.failOnCreate=true)?" +
"transport.useExponentialBackOff=false&transport.maxReconnectAttempts=3&transport.initialReconnectDelay=100");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.UseExponentialBackOff);
Assert.AreEqual(3, failover.MaxReconnectAttempts);
Assert.AreEqual(100, failover.InitialReconnectDelay);
transport.Start();
ActiveMQMessage message = new ActiveMQMessage();
Assert.Throws<IOException>(delegate() { transport.Oneway(message); }, "Oneway call should block and then throw.");
Assert.IsNotEmpty(this.exceptions);
Assert.IsFalse(failover.IsConnected);
}
}
[Test]
public void FailoverTransportFailingBackupsTest()
{
Uri uri = new Uri(
"failover:(mock://localhost:61616," +
"mock://localhost:61618?transport.failOnCreate=true)?transport.randomize=false&transport.backup=true");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.Randomize);
Assert.IsTrue(failover.Backup);
transport.Start();
Thread.Sleep(2000);
Assert.IsTrue(failover.IsConnected);
}
}
[Test]
public void FailoverTransportSendOnewayMessageTest()
{
int numMessages = 1000;
Uri uri = new Uri("failover:(mock://localhost:61616)?transport.randomize=false");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.Randomize);
failover.Resumed = OnResumed;
transport.Start();
Thread.Sleep(1000);
Assert.IsTrue(failover.IsConnected);
// Ensure the current mock transport has the correct outgoing command handler
MockTransport mock = transport.Narrow(typeof(MockTransport)) as MockTransport;
Assert.IsNotNull(mock);
Assert.AreEqual(61616, mock.RemoteAddress.Port);
VerifyCommandHandlerSetting(transport, mock);
mock.OutgoingCommand = OnOutgoingCommand;
ActiveMQMessage message = new ActiveMQMessage();
for(int i = 0; i < numMessages; ++i)
{
transport.Oneway(message);
}
Thread.Sleep(1000);
Assert.AreEqual(numMessages, this.sent.Count);
}
}
[Test]
public void FailoverTransportSendRequestTest()
{
Uri uri = new Uri("failover:(mock://localhost:61616)?transport.randomize=false");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.Randomize);
failover.Resumed = OnResumed;
transport.Start();
Thread.Sleep(1000);
Assert.IsTrue(failover.IsConnected);
// Ensure the current mock transport has the correct outgoing command handler
MockTransport mock = transport.Narrow(typeof(MockTransport)) as MockTransport;
Assert.IsNotNull(mock);
Assert.AreEqual(61616, mock.RemoteAddress.Port);
VerifyCommandHandlerSetting(transport, mock);
mock.OutgoingCommand = OnOutgoingCommand;
ActiveMQMessage message = new ActiveMQMessage();
int numMessages = 4;
for(int i = 0; i < numMessages; ++i)
{
transport.Request(message);
}
Thread.Sleep(1000);
Assert.AreEqual(numMessages, this.sent.Count);
}
}
[Test]
public void FailoverTransportSendOnewayFailTest()
{
Uri uri = new Uri(
"failover:(mock://localhost:61616?transport.failOnSendMessage=true," +
"mock://localhost:61618)?transport.randomize=false");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.Randomize);
failover.Resumed = OnResumed;
transport.Start();
Thread.Sleep(1000);
Assert.IsTrue(failover.IsConnected);
// Ensure the current mock transport has the correct outgoing command handler
MockTransport mock = transport.Narrow(typeof(MockTransport)) as MockTransport;
Assert.IsNotNull(mock);
Assert.AreEqual(61616, mock.RemoteAddress.Port);
VerifyCommandHandlerSetting(transport, mock);
mock.OutgoingCommand = OnOutgoingCommand;
ActiveMQMessage message = new ActiveMQMessage();
int numMessages = 4;
for(int i = 0; i < numMessages; ++i)
{
transport.Oneway(message);
// Make sure we switched to second failover
mock = transport.Narrow(typeof(MockTransport)) as MockTransport;
Assert.IsNotNull(mock);
Assert.AreEqual(61618, mock.RemoteAddress.Port);
}
Thread.Sleep(1000);
Assert.AreEqual(numMessages, this.sent.Count);
}
}
[Test]
public void FailoverTransportSendOnewayTimeoutTest()
{
Uri uri = new Uri(
"failover:(mock://localhost:61616?transport.failOnCreate=true)?transport.timeout=1000");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.AreEqual(1000, failover.Timeout);
transport.Start();
Thread.Sleep(1000);
ActiveMQMessage message = new ActiveMQMessage();
Assert.Throws<IOException>(delegate() { transport.Oneway(message); });
}
}
[Test]
public void FailoverTransportSendRequestFailTest()
{
Uri uri = new Uri(
"failover:(mock://localhost:61616?transport.failOnSendMessage=true," +
"mock://localhost:61618)?transport.randomize=false");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.Randomize);
failover.Resumed = OnResumed;
transport.Start();
Thread.Sleep(1000);
Assert.IsTrue(failover.IsConnected);
// Ensure the current mock transport has the correct outgoing command handler
MockTransport mock = transport.Narrow(typeof(MockTransport)) as MockTransport;
Assert.IsNotNull(mock);
VerifyCommandHandlerSetting(transport, mock);
mock.OutgoingCommand = OnOutgoingCommand;
ActiveMQMessage message = new ActiveMQMessage();
int numMessages = 4;
for(int i = 0; i < numMessages; ++i)
{
transport.Request(message);
}
Thread.Sleep(1000);
Assert.AreEqual(numMessages, this.sent.Count);
}
}
[Test]
public void TestFailoverTransportConnectionControlHandling()
{
Uri uri = new Uri("failover:(mock://localhost:61613)?transport.randomize=false");
string connectedBrokers = "mock://localhost:61616?transport.name=Reconnected," +
"mock://localhost:61617?transport.name=Broker1," +
"mock://localhost:61618?transport.name=Broker2";
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.Randomize);
const int MAX_ATTEMPTS = 50;
transport.Start();
for(int i = 0; i < MAX_ATTEMPTS; ++i)
{
if(failover.IsConnected)
{
break;
}
Thread.Sleep(100);
}
Assert.IsTrue(failover.IsConnected);
// Ensure the current mock transport has the correct outgoing command handler
MockTransport mock = transport.Narrow(typeof(MockTransport)) as MockTransport;
Assert.IsNotNull(mock);
Assert.AreEqual(61613, mock.RemoteAddress.Port);
VerifyCommandHandlerSetting(transport, mock);
mock.OutgoingCommand = OnOutgoingCommand;
mock.InjectCommand(new ConnectionControl()
{
FaultTolerant = true,
ConnectedBrokers = connectedBrokers,
RebalanceConnection = true
});
// Give a bit of time for the Command to actually be processed.
Thread.Sleep(2000);
mock = null;
for(int i = 0; i < MAX_ATTEMPTS; ++i)
{
mock = transport.Narrow(typeof(MockTransport)) as MockTransport;
if(mock != null)
{
break;
}
Thread.Sleep(100);
}
Assert.IsNotNull(mock, "Error reconnecting to failover broker.");
Assert.AreEqual(61616, mock.RemoteAddress.Port);
Assert.AreEqual("Reconnected", mock.Name);
}
}
[Test]
public void TestPriorityBackupConfig()
{
Uri uri = new Uri("failover:(mock://localhost:61616,mock://localhost:61618)"+
"?transport.randomize=false&transport.priorityBackup=true");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.Randomize, "Randomize should be false");
Assert.IsTrue(failover.PriorityBackup, "Prioirity Backup not set.");
transport.Start();
for(int i = 0; i < MAX_ATTEMPTS; ++i)
{
if(failover.IsConnected)
{
break;
}
Thread.Sleep(100);
}
Assert.IsTrue(failover.IsConnected);
Assert.IsTrue(failover.IsConnectedToPriority);
}
}
[Test]
public void TestPriorityBackupConfigPriorityURIsList()
{
Uri uri = new Uri("failover:(mock://localhost:61616,mock://localhost:61618)" +
"?transport.randomize=false&transport.priorityBackup=true&" +
"transport.priorityURIs=mock://localhost:61616,mock://localhost:61618");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.Randomize, "Randomize should be false");
Assert.IsTrue(failover.PriorityBackup, "Prioirity Backup not set.");
String priorityURIs = failover.PriorityURIs;
String[] tokens = priorityURIs.Split(new Char[] { ',' });
Assert.AreEqual(2, tokens.Length, "Bad priorityURIs string: " + priorityURIs);
transport.Start();
for(int i = 0; i < MAX_ATTEMPTS; ++i)
{
if(failover.IsConnected)
{
break;
}
Thread.Sleep(100);
}
Assert.IsTrue(failover.IsConnected);
Assert.IsTrue(failover.IsConnectedToPriority);
}
}
[Test]
public void OpenWireCommandsTest()
{
Uri uri = new Uri("failover:(mock://localhost:61616)?transport.randomize=false");
FailoverTransportFactory factory = new FailoverTransportFactory();
using(ITransport transport = factory.CreateTransport(uri))
{
Assert.IsNotNull(transport);
transport.Command = OnCommand;
transport.Exception = OnException;
transport.Resumed = OnResumed;
transport.Interrupted = OnInterrupted;
FailoverTransport failover = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
Assert.IsNotNull(failover);
Assert.IsFalse(failover.Randomize);
transport.Start();
Thread.Sleep(1000);
Assert.IsTrue(failover.IsConnected);
ConnectionInfo connection = createConnection();
transport.Request(connection);
SessionInfo session1 = createSession(connection);
transport.Request(session1);
SessionInfo session2 = createSession(connection);
transport.Request(session2);
ConsumerInfo consumer1 = createConsumer(session1);
transport.Request(consumer1);
ConsumerInfo consumer2 = createConsumer(session1);
transport.Request(consumer2);
ConsumerInfo consumer3 = createConsumer(session2);
transport.Request(consumer3);
ProducerInfo producer1 = createProducer(session2);
transport.Request(producer1);
// Remove the Producers
disposeOf(transport, producer1);
// Remove the Consumers
disposeOf(transport, consumer1);
disposeOf(transport, consumer2);
disposeOf(transport, consumer3);
// Remove the Session instances.
disposeOf(transport, session1);
disposeOf(transport, session2);
// Indicate that we are done.
ShutdownInfo shutdown = new ShutdownInfo();
transport.Oneway(shutdown);
}
}
protected ConnectionInfo createConnection()
{
return new ConnectionInfo()
{
ClientId = Guid.NewGuid().ToString(),
ConnectionId = new ConnectionId()
{
Value = Guid.NewGuid().ToString()
}
};
}
protected SessionInfo createSession(ConnectionInfo parent)
{
return new SessionInfo()
{
SessionId = new SessionId()
{
ConnectionId = parent.ConnectionId.Value,
Value = sessionIdx++
}
};
}
protected ConsumerInfo createConsumer(SessionInfo parent)
{
return new ConsumerInfo()
{
ConsumerId = new ConsumerId()
{
ConnectionId = parent.SessionId.ConnectionId,
SessionId = parent.SessionId.Value,
Value = consumerIdx++
}
};
}
protected ProducerInfo createProducer(SessionInfo parent)
{
return new ProducerInfo()
{
ProducerId = new ProducerId()
{
ConnectionId = parent.SessionId.ConnectionId,
SessionId = parent.SessionId.Value,
Value = producerIdx++
}
};
}
protected void disposeOf(ITransport transport, SessionInfo session)
{
transport.Oneway(new RemoveInfo() { ObjectId = session.SessionId });
}
protected void disposeOf(ITransport transport, ConsumerInfo consumer)
{
transport.Oneway(new RemoveInfo() { ObjectId = consumer.ConsumerId });
}
protected void disposeOf(ITransport transport, ProducerInfo producer)
{
transport.Oneway(new RemoveInfo() { ObjectId = producer.ProducerId });
}
[Test]
public void FailoverTransportFailOnProcessingReceivedMessageTest()
{
string uri = "failover:(tcp://${activemqhost}:61616)";
IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
using(connection = factory.CreateConnection() as Connection )
{
connection.ConnectionInterruptedListener +=
new ConnectionInterruptedListener(TransportInterrupted);
connection.ConnectionResumedListener +=
new ConnectionResumedListener(TransportResumed);
connection.Start();
using(ISession session = connection.CreateSession())
{
IDestination destination = session.GetQueue("Test?consumer.prefetchSize=1");
PurgeQueue(connection, destination);
PutMsgIntoQueue(session, destination);
using(IMessageConsumer consumer = session.CreateConsumer(destination))
{
consumer.Listener += OnMessage;
BreakConnection();
WaitForMessagesToArrive();
}
}
}
Assert.IsTrue(this.interrupted);
Assert.IsTrue(this.resumed);
}
[Test]
public void FailStartupMaxReconnectAttempts()
{
// Connect to valid machine, but on invalid port that doesn't have a broker listening.
string uri = "failover:(tcp://localhost:31313)?transport.StartupMaxReconnectAttempts=3";
IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
IConnection failConnection = factory.CreateConnection();
try
{
failConnection.Start();
Assert.Fail("Should not have connected to broker.");
}
catch(Apache.NMS.ActiveMQ.ConnectionClosedException)
{
}
catch(Apache.NMS.NMSConnectionException)
{
}
catch(Exception e)
{
Assert.Fail("Wrong Exception Thrown after max reconnect attempts.\n" +
"Exception thrown type: " + e.GetType());
}
finally
{
try
{
failConnection.Stop();
}
catch(Exception)
{
Assert.Fail("Connection closed exception thrown while closing a connection.");
}
finally
{
try
{
failConnection.Dispose();
}
catch(Exception)
{
Assert.Fail("Connection closed exception thrown while closing a connection.");
}
}
}
}
public void TransportInterrupted()
{
this.interrupted = true;
}
public void TransportResumed()
{
this.resumed = true;
}
public void OnMessage(IMessage message)
{
var textMsg = message as ITextMessage;
if(textMsg == null)
{
return;
}
msgCount--;
// just process the first message for 10 seconds to give some time main thread
// to restart ActiveMq broker
if(msgCount == MESSAGE_COUNT - 1)
{
Thread.Sleep(10000);
}
if(msgCount == 0)
{
// if all messages were consumed then we are fine
semaphore.Set();
}
}
private void PutMsgIntoQueue(ISession session, IDestination destination)
{
using(IMessageProducer producer = session.CreateProducer(destination))
{
ITextMessage message = session.CreateTextMessage();
for(int i = 0; i < msgCount; ++i)
{
message.Text = "Test message " + (i + 1);
producer.Send(message);
}
}
}
public void PurgeQueue(IConnection conn, IDestination queue)
{
ISession session = conn.CreateSession();
IMessageConsumer consumer = session.CreateConsumer(queue);
while(consumer.Receive(TimeSpan.FromMilliseconds(500)) != null)
{
}
consumer.Close();
session.Close();
}
private void BreakConnection()
{
TcpTransport transport = this.connection.ITransport.Narrow(typeof(TcpTransport)) as TcpTransport;
Assert.IsNotNull(transport);
transport.Close();
}
protected void WaitForMessagesToArrive()
{
semaphore.WaitOne(30000, true);
Assert.AreEqual(0, msgCount);
}
}
}