|  | /* | 
|  | * Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | * contributor license agreements.  See the NOTICE file distributed with | 
|  | * this work for additional information regarding copyright ownership. | 
|  | * The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | * (the "License"); you may not use this file except in compliance with | 
|  | * the License.  You may obtain a copy of the License at | 
|  | * | 
|  | *     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless required by applicable law or agreed to in writing, software | 
|  | * distributed under the License is distributed on an "AS IS" BASIS, | 
|  | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | * See the License for the specific language governing permissions and | 
|  | * limitations under the License. | 
|  | */ | 
|  |  | 
|  | using System; | 
|  | using System.Threading.Tasks; | 
|  | using Apache.NMS.ActiveMQ.Commands; | 
|  | using Apache.NMS.ActiveMQ.Transport; | 
|  | using Apache.NMS.ActiveMQ.Transport.Tcp; | 
|  | using Apache.NMS.Test; | 
|  | using NUnit.Framework; | 
|  |  | 
|  | namespace Apache.NMS.ActiveMQ.Test | 
|  | { | 
|  | [TestFixture] | 
|  | public class FailoverTransactionTest : NMSTestSupport | 
|  | { | 
|  | private Connection connection; | 
|  | private bool interrupted = false; | 
|  | private bool resumed = false; | 
|  | private bool commitFailed = false; | 
|  |  | 
|  | private readonly int MSG_COUNT = 2; | 
|  | private readonly String destinationName = "FailoverTransactionTestQ"; | 
|  |  | 
|  | [SetUp] | 
|  | public override void SetUp() | 
|  | { | 
|  | base.SetUp(); | 
|  |  | 
|  | this.connection = null; | 
|  | this.interrupted = false; | 
|  | this.resumed = false; | 
|  | this.commitFailed = false; | 
|  | } | 
|  |  | 
|  | [Test, Timeout(20_000), Ignore("Flaky test, needs investigation")] | 
|  | public void FailoverAfterCommitSentTest() | 
|  | { | 
|  | string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)"; | 
|  | IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(uri)); | 
|  | using(connection = factory.CreateConnection("guest", "guest" ) as Connection) | 
|  | { | 
|  | connection.ConnectionInterruptedListener += | 
|  | new ConnectionInterruptedListener(TransportInterrupted); | 
|  | connection.ConnectionResumedListener += | 
|  | new ConnectionResumedListener(TransportResumed); | 
|  |  | 
|  | connection.Start(); | 
|  |  | 
|  | ITransport transport = (connection as Connection).ITransport; | 
|  | TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; | 
|  | Assert.IsNotNull(tcpFaulty); | 
|  | tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook; | 
|  |  | 
|  | using(ISession session = connection.CreateSession()) | 
|  | { | 
|  | IDestination destination = session.GetQueue(destinationName); | 
|  | DeleteQueue(connection, destination); | 
|  | } | 
|  |  | 
|  | Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: " + destinationName); | 
|  |  | 
|  | using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) | 
|  | { | 
|  | IDestination destination = session.GetQueue(destinationName); | 
|  | PutMsgIntoQueue(session, destination, false); | 
|  |  | 
|  | try | 
|  | { | 
|  | session.Commit(); | 
|  | Assert.Fail("Should have thrown a TransactionRolledBackException"); | 
|  | } | 
|  | catch(TransactionRolledBackException) | 
|  | { | 
|  | Tracer.Info("TEST: Caught expected TransactionRolledBackException"); | 
|  | } | 
|  | catch(Exception ex) | 
|  | { | 
|  | Assert.Fail("Should have thrown a TransactionRolledBackException, but was: " + | 
|  | ex.GetType().Name); | 
|  | } | 
|  | } | 
|  |  | 
|  | Assert.IsTrue(this.interrupted); | 
|  | Assert.IsTrue(this.resumed); | 
|  |  | 
|  | Tracer.Debug("Test is attempting to read " + MSG_COUNT + | 
|  | " messages from the queue: " + destinationName); | 
|  |  | 
|  | using(ISession session = connection.CreateSession()) | 
|  | { | 
|  | IDestination destination = session.GetQueue(destinationName); | 
|  | IMessageConsumer consumer = session.CreateConsumer(destination); | 
|  | for (int i = 0; i < MSG_COUNT; ++i) | 
|  | { | 
|  | IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5)); | 
|  | Assert.IsNotNull(msg, "Should receive message[" + (i + 1) + "] after commit failed once."); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | Assert.IsTrue(this.interrupted); | 
|  | Assert.IsTrue(this.resumed); | 
|  | } | 
|  |  | 
|  | [Test, Timeout(20_000)] | 
|  | public void FailoverBeforeCommitSentTest() | 
|  | { | 
|  | string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)"; | 
|  | IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(uri)); | 
|  | using(connection = factory.CreateConnection("guest", "guest" ) as Connection) | 
|  | { | 
|  | connection.ConnectionInterruptedListener += | 
|  | new ConnectionInterruptedListener(TransportInterrupted); | 
|  | connection.ConnectionResumedListener += | 
|  | new ConnectionResumedListener(TransportResumed); | 
|  |  | 
|  | connection.Start(); | 
|  |  | 
|  | ITransport transport = (connection as Connection).ITransport; | 
|  | TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; | 
|  | Assert.IsNotNull(tcpFaulty); | 
|  | tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook; | 
|  |  | 
|  | using(ISession session = connection.CreateSession()) | 
|  | { | 
|  | IDestination destination = session.GetQueue(destinationName); | 
|  | PurgeQueue(connection, destination); | 
|  | } | 
|  |  | 
|  | Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: " + destinationName); | 
|  |  | 
|  | using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) | 
|  | { | 
|  | IDestination destination = session.GetQueue(destinationName); | 
|  | PutMsgIntoQueue(session, destination, false); | 
|  |  | 
|  | try | 
|  | { | 
|  | session.Commit(); | 
|  | Assert.Fail("Should have thrown a TransactionRolledBackException"); | 
|  | } | 
|  | catch(TransactionRolledBackException) | 
|  | { | 
|  | } | 
|  | catch | 
|  | { | 
|  | Assert.Fail("Should have thrown a TransactionRolledBackException"); | 
|  | } | 
|  | } | 
|  |  | 
|  | Assert.IsTrue(this.interrupted); | 
|  | Assert.IsTrue(this.resumed); | 
|  |  | 
|  | Tracer.Debug("Test is attempting to read a message from" + | 
|  | destinationName + " but no messages are expected"); | 
|  |  | 
|  | using(ISession session = connection.CreateSession()) | 
|  | { | 
|  | IDestination destination = session.GetQueue(destinationName); | 
|  | IMessageConsumer consumer = session.CreateConsumer(destination); | 
|  | IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5)); | 
|  | Assert.IsNull(msg, "Should not receive a message after commit failed."); | 
|  | } | 
|  | } | 
|  |  | 
|  | Assert.IsTrue(this.interrupted); | 
|  | Assert.IsTrue(this.resumed); | 
|  | } | 
|  |  | 
|  | [Test, Timeout(20_000)] | 
|  | public void FailoverWithShortLivedProducerTest() | 
|  | { | 
|  | string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)"; | 
|  | IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(uri)); | 
|  | using(connection = factory.CreateConnection("guest", "guest" ) as Connection) | 
|  | { | 
|  | connection.ConnectionInterruptedListener += | 
|  | new ConnectionInterruptedListener(TransportInterrupted); | 
|  | connection.ConnectionResumedListener += | 
|  | new ConnectionResumedListener(TransportResumed); | 
|  |  | 
|  | connection.Start(); | 
|  |  | 
|  | ITransport transport = (connection as Connection).ITransport; | 
|  | TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; | 
|  | Assert.IsNotNull(tcpFaulty); | 
|  |  | 
|  | using(ISession session = connection.CreateSession()) | 
|  | { | 
|  | IDestination destination = session.GetQueue(destinationName); | 
|  | PurgeQueue(connection, destination); | 
|  | } | 
|  |  | 
|  | Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: " + destinationName); | 
|  |  | 
|  | using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) | 
|  | { | 
|  | IDestination destination = session.GetQueue(destinationName); | 
|  | PutMsgIntoQueue(session, destination, false); | 
|  | tcpFaulty.Close(); | 
|  | PutMsgIntoQueue(session, destination, false); | 
|  | session.Commit(); | 
|  | } | 
|  |  | 
|  | Assert.IsTrue(this.interrupted); | 
|  | Assert.IsTrue(this.resumed); | 
|  |  | 
|  | Tracer.Debug("Test is attempting to read " + MSG_COUNT + | 
|  | " messages from the queue: " + destinationName); | 
|  |  | 
|  | using(ISession session = connection.CreateSession()) | 
|  | { | 
|  | IDestination destination = session.GetQueue(destinationName); | 
|  | IMessageConsumer consumer = session.CreateConsumer(destination); | 
|  | for (int i = 0; i < MSG_COUNT; ++i) | 
|  | { | 
|  | IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5)); | 
|  | Assert.IsNotNull(msg, "Should receive message[" + (i + 1) + "] after commit failed once."); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | Assert.IsTrue(this.interrupted); | 
|  | Assert.IsTrue(this.resumed); | 
|  | } | 
|  |  | 
|  | [Test, Timeout(20_000)] | 
|  | public void TestMessageDeliveredAfterCommitFailsAndRollback() | 
|  | { | 
|  | string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)"; | 
|  | IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(uri)); | 
|  | using(connection = factory.CreateConnection("guest", "guest" ) as Connection) | 
|  | { | 
|  | using(ISession session = connection.CreateSession()) | 
|  | { | 
|  | IDestination destination = session.GetQueue(destinationName); | 
|  | DeleteQueue(connection, destination); | 
|  | PutOneMsgIntoQueue(session, destination); | 
|  | } | 
|  |  | 
|  | using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) | 
|  | { | 
|  | connection.Start(); | 
|  |  | 
|  | ITransport transport = (connection as Connection).ITransport; | 
|  | TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; | 
|  | Assert.IsNotNull(tcpFaulty); | 
|  | tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook; | 
|  |  | 
|  | IMessageConsumer consumer = session.CreateConsumer(session.GetQueue(destinationName)); | 
|  | IMessage message = consumer.Receive(TimeSpan.FromSeconds(30)); | 
|  | Assert.IsNotNull(message, "Message was not delivered"); | 
|  | Tracer.Debug("Commiting transaction"); | 
|  |  | 
|  | try | 
|  | { | 
|  | Tracer.Info("Now attempting to commit the transaction"); | 
|  | session.Commit(); | 
|  | } | 
|  | catch (Exception ex) | 
|  | { | 
|  | Tracer.InfoFormat("Commit failed as expected. {0}", ex.Message); | 
|  | } | 
|  |  | 
|  | message = consumer.Receive(TimeSpan.FromSeconds(30)); | 
|  | Assert.IsNotNull(message, "message was not redilivered"); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | public void TransportInterrupted() | 
|  | { | 
|  | this.interrupted = true; | 
|  | } | 
|  |  | 
|  | public void TransportResumed() | 
|  | { | 
|  | this.resumed = true; | 
|  | } | 
|  |  | 
|  | private void PutMsgIntoQueue(ISession session, IDestination destination) | 
|  | { | 
|  | PutMsgIntoQueue(session, destination, true, MSG_COUNT); | 
|  | } | 
|  |  | 
|  | private void PutOneMsgIntoQueue(ISession session, IDestination destination) | 
|  | { | 
|  | PutMsgIntoQueue(session, destination, true, 1); | 
|  | } | 
|  |  | 
|  | private void PutMsgIntoQueue(ISession session, IDestination destination, bool commit) | 
|  | { | 
|  | PutMsgIntoQueue(session, destination, commit, MSG_COUNT); | 
|  | } | 
|  |  | 
|  | private void PutMsgIntoQueue(ISession session, IDestination destination, bool commit, int count) | 
|  | { | 
|  | using(IMessageProducer producer = session.CreateProducer(destination)) | 
|  | { | 
|  | ITextMessage message = session.CreateTextMessage(); | 
|  | for(int i = 0; i < count; ++i) | 
|  | { | 
|  | message.Text = "Test message " + (i + 1); | 
|  | producer.Send(message); | 
|  | } | 
|  |  | 
|  | if (session.Transacted && commit) | 
|  | { | 
|  | session.Commit(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | public void PurgeQueue(IConnection conn, IDestination queue) | 
|  | { | 
|  | using(ISession session = conn.CreateSession()) | 
|  | { | 
|  | using(IMessageConsumer consumer = session.CreateConsumer(queue)) | 
|  | while(consumer.Receive(TimeSpan.FromMilliseconds(500)) != null) | 
|  | { | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | private void DeleteQueue(IConnection connection, IDestination queue) | 
|  | { | 
|  | using (ISession session = connection.CreateSession()) | 
|  | { | 
|  | session.DeleteDestination(queue); | 
|  | } | 
|  | } | 
|  |  | 
|  | private void BreakConnection() | 
|  | { | 
|  | TcpTransport transport = this.connection.ITransport.Narrow(typeof(TcpTransport)) as TcpTransport; | 
|  | Assert.IsNotNull(transport); | 
|  | transport.Close(); | 
|  | } | 
|  |  | 
|  | public async Task FailOnCommitTransportHook(ITransport transport, Command command) | 
|  | { | 
|  | if (commitFailed) | 
|  | { | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (command is TransactionInfo) | 
|  | { | 
|  | TransactionInfo txInfo = command as TransactionInfo; | 
|  | if (txInfo.Type == (byte)TransactionType.CommitOnePhase) | 
|  | { | 
|  | Tracer.Debug("Exception from the Commit to simulate an connection drop."); | 
|  | commitFailed = true; | 
|  | TcpTransport tcpTransport = transport as TcpTransport; | 
|  | tcpTransport.Close(); | 
|  | } | 
|  | } | 
|  | await Task.CompletedTask; | 
|  | } | 
|  | } | 
|  | } |