| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using Apache.NMS; |
| using Apache.NMS.Util; |
| using Apache.NMS.Test; |
| using Apache.NMS.ActiveMQ; |
| using Apache.NMS.ActiveMQ.Commands; |
| using Apache.NMS.ActiveMQ.Transport; |
| using Apache.NMS.ActiveMQ.Transport.Failover; |
| using Apache.NMS.ActiveMQ.Transport.Tcp; |
| using NUnit.Framework; |
| |
| namespace Apache.NMS.ActiveMQ.Test |
| { |
| [TestFixture] |
| public class FailoverTransactionTest : NMSTestSupport |
| { |
| private Connection connection; |
| private bool interrupted = false; |
| private bool resumed = false; |
| private bool commitFailed = false; |
| |
| private readonly int MSG_COUNT = 2; |
| private readonly String destinationName = "FailoverTransactionTestQ"; |
| |
| [SetUp] |
| public override void SetUp() |
| { |
| base.SetUp(); |
| |
| this.connection = null; |
| this.interrupted = false; |
| this.resumed = false; |
| this.commitFailed = false; |
| } |
| |
| [Test] |
| public void FailoverAfterCommitSentTest() |
| { |
| string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)"; |
| IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri)); |
| using(connection = factory.CreateConnection() as Connection) |
| { |
| connection.ConnectionInterruptedListener += |
| new ConnectionInterruptedListener(TransportInterrupted); |
| connection.ConnectionResumedListener += |
| new ConnectionResumedListener(TransportResumed); |
| |
| connection.Start(); |
| |
| ITransport transport = (connection as Connection).ITransport; |
| TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; |
| Assert.IsNotNull(tcpFaulty); |
| tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook; |
| |
| using(ISession session = connection.CreateSession()) |
| { |
| IDestination destination = session.GetQueue(destinationName); |
| DeleteQueue(connection, destination); |
| } |
| |
| Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: " + destinationName); |
| |
| using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) |
| { |
| IDestination destination = session.GetQueue(destinationName); |
| PutMsgIntoQueue(session, destination, false); |
| |
| try |
| { |
| session.Commit(); |
| Assert.Fail("Should have thrown a TransactionRolledBackException"); |
| } |
| catch(TransactionRolledBackException) |
| { |
| Tracer.Info("TEST: Caught expected TransactionRolledBackException"); |
| } |
| catch(Exception ex) |
| { |
| Assert.Fail("Should have thrown a TransactionRolledBackException, but was: " + |
| ex.GetType().Name); |
| } |
| } |
| |
| Assert.IsTrue(this.interrupted); |
| Assert.IsTrue(this.resumed); |
| |
| Tracer.Debug("Test is attempting to read " + MSG_COUNT + |
| " messages from the queue: " + destinationName); |
| |
| using(ISession session = connection.CreateSession()) |
| { |
| IDestination destination = session.GetQueue(destinationName); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| for (int i = 0; i < MSG_COUNT; ++i) |
| { |
| IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5)); |
| Assert.IsNotNull(msg, "Should receive message[" + (i + 1) + "] after commit failed once."); |
| } |
| } |
| } |
| |
| Assert.IsTrue(this.interrupted); |
| Assert.IsTrue(this.resumed); |
| } |
| |
| [Test] |
| public void FailoverBeforeCommitSentTest() |
| { |
| string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)"; |
| IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri)); |
| using(connection = factory.CreateConnection() as Connection) |
| { |
| connection.ConnectionInterruptedListener += |
| new ConnectionInterruptedListener(TransportInterrupted); |
| connection.ConnectionResumedListener += |
| new ConnectionResumedListener(TransportResumed); |
| |
| connection.Start(); |
| |
| ITransport transport = (connection as Connection).ITransport; |
| TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; |
| Assert.IsNotNull(tcpFaulty); |
| tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook; |
| |
| using(ISession session = connection.CreateSession()) |
| { |
| IDestination destination = session.GetQueue(destinationName); |
| PurgeQueue(connection, destination); |
| } |
| |
| Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: " + destinationName); |
| |
| using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) |
| { |
| IDestination destination = session.GetQueue(destinationName); |
| PutMsgIntoQueue(session, destination, false); |
| |
| try |
| { |
| session.Commit(); |
| Assert.Fail("Should have thrown a TransactionRolledBackException"); |
| } |
| catch(TransactionRolledBackException) |
| { |
| } |
| catch |
| { |
| Assert.Fail("Should have thrown a TransactionRolledBackException"); |
| } |
| } |
| |
| Assert.IsTrue(this.interrupted); |
| Assert.IsTrue(this.resumed); |
| |
| Tracer.Debug("Test is attempting to read a message from" + |
| destinationName + " but no messages are expected"); |
| |
| using(ISession session = connection.CreateSession()) |
| { |
| IDestination destination = session.GetQueue(destinationName); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5)); |
| Assert.IsNull(msg, "Should not receive a message after commit failed."); |
| } |
| } |
| |
| Assert.IsTrue(this.interrupted); |
| Assert.IsTrue(this.resumed); |
| } |
| |
| [Test] |
| public void FailoverWithShortLivedProducerTest() |
| { |
| string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)"; |
| IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri)); |
| using(connection = factory.CreateConnection() as Connection) |
| { |
| connection.ConnectionInterruptedListener += |
| new ConnectionInterruptedListener(TransportInterrupted); |
| connection.ConnectionResumedListener += |
| new ConnectionResumedListener(TransportResumed); |
| |
| connection.Start(); |
| |
| ITransport transport = (connection as Connection).ITransport; |
| TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; |
| Assert.IsNotNull(tcpFaulty); |
| |
| using(ISession session = connection.CreateSession()) |
| { |
| IDestination destination = session.GetQueue(destinationName); |
| PurgeQueue(connection, destination); |
| } |
| |
| Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: " + destinationName); |
| |
| using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) |
| { |
| IDestination destination = session.GetQueue(destinationName); |
| PutMsgIntoQueue(session, destination, false); |
| tcpFaulty.Close(); |
| PutMsgIntoQueue(session, destination, false); |
| session.Commit(); |
| } |
| |
| Assert.IsTrue(this.interrupted); |
| Assert.IsTrue(this.resumed); |
| |
| Tracer.Debug("Test is attempting to read " + MSG_COUNT + |
| " messages from the queue: " + destinationName); |
| |
| using(ISession session = connection.CreateSession()) |
| { |
| IDestination destination = session.GetQueue(destinationName); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| for (int i = 0; i < MSG_COUNT; ++i) |
| { |
| IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5)); |
| Assert.IsNotNull(msg, "Should receive message[" + (i + 1) + "] after commit failed once."); |
| } |
| } |
| } |
| |
| Assert.IsTrue(this.interrupted); |
| Assert.IsTrue(this.resumed); |
| } |
| |
| [Test] |
| public void TestMessageDeliveredAfterCommitFailsAndRollback() |
| { |
| string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)"; |
| IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri)); |
| using(connection = factory.CreateConnection() as Connection) |
| { |
| using(ISession session = connection.CreateSession()) |
| { |
| IDestination destination = session.GetQueue(destinationName); |
| DeleteQueue(connection, destination); |
| PutOneMsgIntoQueue(session, destination); |
| } |
| |
| using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) |
| { |
| connection.Start(); |
| |
| ITransport transport = (connection as Connection).ITransport; |
| TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; |
| Assert.IsNotNull(tcpFaulty); |
| tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook; |
| |
| IMessageConsumer consumer = session.CreateConsumer(session.GetQueue(destinationName)); |
| IMessage message = consumer.Receive(TimeSpan.FromSeconds(30)); |
| Assert.IsNotNull(message, "Message was not delivered"); |
| Tracer.Debug("Commiting transaction"); |
| |
| try |
| { |
| Tracer.Info("Now attempting to commit the transaction"); |
| session.Commit(); |
| } |
| catch (Exception ex) |
| { |
| Tracer.InfoFormat("Commit failed as expected. {0}", ex.Message); |
| } |
| |
| message = consumer.Receive(TimeSpan.FromSeconds(30)); |
| Assert.IsNotNull(message, "message was not redilivered"); |
| } |
| } |
| } |
| |
| public void TransportInterrupted() |
| { |
| this.interrupted = true; |
| } |
| |
| public void TransportResumed() |
| { |
| this.resumed = true; |
| } |
| |
| private void PutMsgIntoQueue(ISession session, IDestination destination) |
| { |
| PutMsgIntoQueue(session, destination, true, MSG_COUNT); |
| } |
| |
| private void PutOneMsgIntoQueue(ISession session, IDestination destination) |
| { |
| PutMsgIntoQueue(session, destination, true, 1); |
| } |
| |
| private void PutMsgIntoQueue(ISession session, IDestination destination, bool commit) |
| { |
| PutMsgIntoQueue(session, destination, commit, MSG_COUNT); |
| } |
| |
| private void PutMsgIntoQueue(ISession session, IDestination destination, bool commit, int count) |
| { |
| using(IMessageProducer producer = session.CreateProducer(destination)) |
| { |
| ITextMessage message = session.CreateTextMessage(); |
| for(int i = 0; i < count; ++i) |
| { |
| message.Text = "Test message " + (i + 1); |
| producer.Send(message); |
| } |
| |
| if (session.Transacted && commit) |
| { |
| session.Commit(); |
| } |
| } |
| } |
| |
| public void PurgeQueue(IConnection conn, IDestination queue) |
| { |
| using(ISession session = conn.CreateSession()) |
| { |
| using(IMessageConsumer consumer = session.CreateConsumer(queue)) |
| while(consumer.Receive(TimeSpan.FromMilliseconds(500)) != null) |
| { |
| } |
| } |
| } |
| |
| private void DeleteQueue(IConnection connection, IDestination queue) |
| { |
| using (ISession session = connection.CreateSession()) |
| { |
| session.DeleteDestination(queue); |
| } |
| } |
| |
| private void BreakConnection() |
| { |
| TcpTransport transport = this.connection.ITransport.Narrow(typeof(TcpTransport)) as TcpTransport; |
| Assert.IsNotNull(transport); |
| transport.Close(); |
| } |
| |
| public void FailOnCommitTransportHook(ITransport transport, Command command) |
| { |
| if (commitFailed) |
| { |
| return; |
| } |
| |
| if (command is TransactionInfo) |
| { |
| TransactionInfo txInfo = command as TransactionInfo; |
| if (txInfo.Type == (byte)TransactionType.CommitOnePhase) |
| { |
| Tracer.Debug("Exception from the Commit to simulate an connection drop."); |
| commitFailed = true; |
| TcpTransport tcpTransport = transport as TcpTransport; |
| tcpTransport.Close(); |
| } |
| } |
| } |
| } |
| } |
| |