blob: 408b748983b43509951cfd687b66cf8b24195656 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.Test;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS.ActiveMQ.Transport.Failover;
using Apache.NMS.ActiveMQ.Transport.Tcp;
using NUnit.Framework;
namespace Apache.NMS.ActiveMQ.Test
{
[TestFixture]
public class FailoverTransactionTest : NMSTestSupport
{
private Connection connection;
private bool interrupted = false;
private bool resumed = false;
private bool commitFailed = false;
private readonly int MSG_COUNT = 2;
private readonly String destinationName = "FailoverTransactionTestQ";
[SetUp]
public override void SetUp()
{
base.SetUp();
this.connection = null;
this.interrupted = false;
this.resumed = false;
this.commitFailed = false;
}
[Test]
public void FailoverAfterCommitSentTest()
{
string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)";
IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
using(connection = factory.CreateConnection() as Connection)
{
connection.ConnectionInterruptedListener +=
new ConnectionInterruptedListener(TransportInterrupted);
connection.ConnectionResumedListener +=
new ConnectionResumedListener(TransportResumed);
connection.Start();
ITransport transport = (connection as Connection).ITransport;
TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
Assert.IsNotNull(tcpFaulty);
tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook;
using(ISession session = connection.CreateSession())
{
IDestination destination = session.GetQueue(destinationName);
DeleteQueue(connection, destination);
}
Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: " + destinationName);
using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
{
IDestination destination = session.GetQueue(destinationName);
PutMsgIntoQueue(session, destination, false);
try
{
session.Commit();
Assert.Fail("Should have thrown a TransactionRolledBackException");
}
catch(TransactionRolledBackException)
{
Tracer.Info("TEST: Caught expected TransactionRolledBackException");
}
catch(Exception ex)
{
Assert.Fail("Should have thrown a TransactionRolledBackException, but was: " +
ex.GetType().Name);
}
}
Assert.IsTrue(this.interrupted);
Assert.IsTrue(this.resumed);
Tracer.Debug("Test is attempting to read " + MSG_COUNT +
" messages from the queue: " + destinationName);
using(ISession session = connection.CreateSession())
{
IDestination destination = session.GetQueue(destinationName);
IMessageConsumer consumer = session.CreateConsumer(destination);
for (int i = 0; i < MSG_COUNT; ++i)
{
IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5));
Assert.IsNotNull(msg, "Should receive message[" + (i + 1) + "] after commit failed once.");
}
}
}
Assert.IsTrue(this.interrupted);
Assert.IsTrue(this.resumed);
}
[Test]
public void FailoverBeforeCommitSentTest()
{
string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)";
IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
using(connection = factory.CreateConnection() as Connection)
{
connection.ConnectionInterruptedListener +=
new ConnectionInterruptedListener(TransportInterrupted);
connection.ConnectionResumedListener +=
new ConnectionResumedListener(TransportResumed);
connection.Start();
ITransport transport = (connection as Connection).ITransport;
TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
Assert.IsNotNull(tcpFaulty);
tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
using(ISession session = connection.CreateSession())
{
IDestination destination = session.GetQueue(destinationName);
PurgeQueue(connection, destination);
}
Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: " + destinationName);
using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
{
IDestination destination = session.GetQueue(destinationName);
PutMsgIntoQueue(session, destination, false);
try
{
session.Commit();
Assert.Fail("Should have thrown a TransactionRolledBackException");
}
catch(TransactionRolledBackException)
{
}
catch
{
Assert.Fail("Should have thrown a TransactionRolledBackException");
}
}
Assert.IsTrue(this.interrupted);
Assert.IsTrue(this.resumed);
Tracer.Debug("Test is attempting to read a message from" +
destinationName + " but no messages are expected");
using(ISession session = connection.CreateSession())
{
IDestination destination = session.GetQueue(destinationName);
IMessageConsumer consumer = session.CreateConsumer(destination);
IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5));
Assert.IsNull(msg, "Should not receive a message after commit failed.");
}
}
Assert.IsTrue(this.interrupted);
Assert.IsTrue(this.resumed);
}
[Test]
public void FailoverWithShortLivedProducerTest()
{
string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)";
IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
using(connection = factory.CreateConnection() as Connection)
{
connection.ConnectionInterruptedListener +=
new ConnectionInterruptedListener(TransportInterrupted);
connection.ConnectionResumedListener +=
new ConnectionResumedListener(TransportResumed);
connection.Start();
ITransport transport = (connection as Connection).ITransport;
TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
Assert.IsNotNull(tcpFaulty);
using(ISession session = connection.CreateSession())
{
IDestination destination = session.GetQueue(destinationName);
PurgeQueue(connection, destination);
}
Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: " + destinationName);
using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
{
IDestination destination = session.GetQueue(destinationName);
PutMsgIntoQueue(session, destination, false);
tcpFaulty.Close();
PutMsgIntoQueue(session, destination, false);
session.Commit();
}
Assert.IsTrue(this.interrupted);
Assert.IsTrue(this.resumed);
Tracer.Debug("Test is attempting to read " + MSG_COUNT +
" messages from the queue: " + destinationName);
using(ISession session = connection.CreateSession())
{
IDestination destination = session.GetQueue(destinationName);
IMessageConsumer consumer = session.CreateConsumer(destination);
for (int i = 0; i < MSG_COUNT; ++i)
{
IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5));
Assert.IsNotNull(msg, "Should receive message[" + (i + 1) + "] after commit failed once.");
}
}
}
Assert.IsTrue(this.interrupted);
Assert.IsTrue(this.resumed);
}
[Test]
public void TestMessageDeliveredAfterCommitFailsAndRollback()
{
string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)";
IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
using(connection = factory.CreateConnection() as Connection)
{
using(ISession session = connection.CreateSession())
{
IDestination destination = session.GetQueue(destinationName);
DeleteQueue(connection, destination);
PutOneMsgIntoQueue(session, destination);
}
using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
{
connection.Start();
ITransport transport = (connection as Connection).ITransport;
TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
Assert.IsNotNull(tcpFaulty);
tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
IMessageConsumer consumer = session.CreateConsumer(session.GetQueue(destinationName));
IMessage message = consumer.Receive(TimeSpan.FromSeconds(30));
Assert.IsNotNull(message, "Message was not delivered");
Tracer.Debug("Commiting transaction");
try
{
Tracer.Info("Now attempting to commit the transaction");
session.Commit();
}
catch (Exception ex)
{
Tracer.InfoFormat("Commit failed as expected. {0}", ex.Message);
}
message = consumer.Receive(TimeSpan.FromSeconds(30));
Assert.IsNotNull(message, "message was not redilivered");
}
}
}
public void TransportInterrupted()
{
this.interrupted = true;
}
public void TransportResumed()
{
this.resumed = true;
}
private void PutMsgIntoQueue(ISession session, IDestination destination)
{
PutMsgIntoQueue(session, destination, true, MSG_COUNT);
}
private void PutOneMsgIntoQueue(ISession session, IDestination destination)
{
PutMsgIntoQueue(session, destination, true, 1);
}
private void PutMsgIntoQueue(ISession session, IDestination destination, bool commit)
{
PutMsgIntoQueue(session, destination, commit, MSG_COUNT);
}
private void PutMsgIntoQueue(ISession session, IDestination destination, bool commit, int count)
{
using(IMessageProducer producer = session.CreateProducer(destination))
{
ITextMessage message = session.CreateTextMessage();
for(int i = 0; i < count; ++i)
{
message.Text = "Test message " + (i + 1);
producer.Send(message);
}
if (session.Transacted && commit)
{
session.Commit();
}
}
}
public void PurgeQueue(IConnection conn, IDestination queue)
{
using(ISession session = conn.CreateSession())
{
using(IMessageConsumer consumer = session.CreateConsumer(queue))
while(consumer.Receive(TimeSpan.FromMilliseconds(500)) != null)
{
}
}
}
private void DeleteQueue(IConnection connection, IDestination queue)
{
using (ISession session = connection.CreateSession())
{
session.DeleteDestination(queue);
}
}
private void BreakConnection()
{
TcpTransport transport = this.connection.ITransport.Narrow(typeof(TcpTransport)) as TcpTransport;
Assert.IsNotNull(transport);
transport.Close();
}
public void FailOnCommitTransportHook(ITransport transport, Command command)
{
if (commitFailed)
{
return;
}
if (command is TransactionInfo)
{
TransactionInfo txInfo = command as TransactionInfo;
if (txInfo.Type == (byte)TransactionType.CommitOnePhase)
{
Tracer.Debug("Exception from the Commit to simulate an connection drop.");
commitFailed = true;
TcpTransport tcpTransport = transport as TcpTransport;
tcpTransport.Close();
}
}
}
}
}