| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Threading; |
| using System.Collections; |
| using Apache.NMS.Util; |
| using Apache.NMS.Test; |
| using NUnit.Framework; |
| using NUnit.Framework.Extensions; |
| |
| namespace Apache.NMS.Stomp.Test |
| { |
| [TestFixture] |
| public class TransactionTest : NMSTestSupport |
| { |
| protected static string DESTINATION_NAME = "TransactionTestDestination"; |
| protected static string TEST_CLIENT_ID = "TransactionTestClientId"; |
| protected static string TEST_CLIENT_ID2 = "TransactionTestClientId2"; |
| |
| [RowTest] |
| [Row(MsgDeliveryMode.Persistent)] |
| [Row(MsgDeliveryMode.NonPersistent)] |
| public void TestSendRollback(MsgDeliveryMode deliveryMode) |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next())) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) |
| { |
| IDestination destination = session.CreateTemporaryQueue(); |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) |
| using(IMessageProducer producer = session.CreateProducer(destination)) |
| { |
| producer.DeliveryMode = deliveryMode; |
| producer.RequestTimeout = receiveTimeout; |
| ITextMessage firstMsgSend = session.CreateTextMessage("First Message"); |
| producer.Send(firstMsgSend); |
| session.Commit(); |
| |
| ITextMessage rollbackMsg = session.CreateTextMessage("I'm going to get rolled back."); |
| producer.Send(rollbackMsg); |
| session.Rollback(); |
| |
| ITextMessage secondMsgSend = session.CreateTextMessage("Second Message"); |
| producer.Send(secondMsgSend); |
| session.Commit(); |
| |
| // Receive the messages |
| |
| IMessage message = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); |
| |
| message = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(secondMsgSend, message, "Second message does not match."); |
| |
| // validates that the rollback was not consumed |
| session.Commit(); |
| } |
| } |
| } |
| } |
| |
| [RowTest] |
| [Row(MsgDeliveryMode.Persistent)] |
| [Row(MsgDeliveryMode.NonPersistent)] |
| public void TestSendSessionClose(MsgDeliveryMode deliveryMode) |
| { |
| ITextMessage firstMsgSend; |
| ITextMessage secondMsgSend; |
| |
| using(IConnection connection1 = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next())) |
| { |
| connection1.Start(); |
| |
| // Purge any messages left on the Destination. |
| using(ISession session1 = connection1.CreateSession(AcknowledgementMode.AutoAcknowledge)) |
| { |
| IDestination destination1 = CreateDestination(session1, DESTINATION_NAME); |
| using(IMessageConsumer consumer = session1.CreateConsumer(destination1)) |
| { |
| IMessage message = null; |
| |
| do |
| { |
| message = consumer.Receive(TimeSpan.FromMilliseconds(2000)); |
| } |
| while(message != null); |
| } |
| } |
| |
| using(ISession session1 = connection1.CreateSession(AcknowledgementMode.Transactional)) |
| { |
| IDestination destination1 = CreateDestination(session1, DESTINATION_NAME); |
| using(IMessageConsumer consumer = session1.CreateConsumer(destination1)) |
| { |
| // First connection session that sends one message, and the |
| // second message is implicitly rolled back as the session is |
| // disposed before Commit() can be called. |
| using(IConnection connection2 = CreateConnection(TEST_CLIENT_ID2)) |
| { |
| connection2.Start(); |
| using(ISession session2 = connection2.CreateSession(AcknowledgementMode.Transactional)) |
| { |
| IDestination destination2 = SessionUtil.GetDestination(session2, DESTINATION_NAME); |
| using(IMessageProducer producer = session2.CreateProducer(destination2)) |
| { |
| producer.DeliveryMode = deliveryMode; |
| producer.RequestTimeout = receiveTimeout; |
| firstMsgSend = session2.CreateTextMessage("First Message"); |
| producer.Send(firstMsgSend); |
| session2.Commit(); |
| |
| ITextMessage rollbackMsg = session2.CreateTextMessage("I'm going to get rolled back."); |
| producer.Send(rollbackMsg); |
| } |
| } |
| } |
| |
| // Second connection session that will send one message. |
| using(IConnection connection2 = CreateConnection(TEST_CLIENT_ID2 + ":" + new Random().Next())) |
| { |
| connection2.Start(); |
| using(ISession session2 = connection2.CreateSession(AcknowledgementMode.Transactional)) |
| { |
| IDestination destination2 = SessionUtil.GetDestination(session2, DESTINATION_NAME); |
| using(IMessageProducer producer = session2.CreateProducer(destination2)) |
| { |
| producer.DeliveryMode = deliveryMode; |
| producer.RequestTimeout = receiveTimeout; |
| secondMsgSend = session2.CreateTextMessage("Second Message"); |
| producer.Send(secondMsgSend); |
| session2.Commit(); |
| } |
| } |
| } |
| |
| // Check the consumer to verify which messages were actually received. |
| IMessage message = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); |
| |
| message = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(secondMsgSend, message, "Second message does not match."); |
| |
| // validates that the rollback was not consumed |
| session1.Commit(); |
| } |
| } |
| } |
| } |
| |
| [RowTest] |
| [Row(MsgDeliveryMode.Persistent)] |
| [Row(MsgDeliveryMode.NonPersistent)] |
| public void TestReceiveRollback(MsgDeliveryMode deliveryMode) |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next())) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) |
| { |
| IDestination destination = session.CreateTemporaryQueue(); |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) |
| using(IMessageProducer producer = session.CreateProducer(destination)) |
| { |
| producer.DeliveryMode = deliveryMode; |
| producer.RequestTimeout = receiveTimeout; |
| // Send both messages |
| ITextMessage firstMsgSend = session.CreateTextMessage("First Message"); |
| producer.Send(firstMsgSend); |
| ITextMessage secondMsgSend = session.CreateTextMessage("Second Message"); |
| producer.Send(secondMsgSend); |
| session.Commit(); |
| |
| // Receive the messages |
| |
| IMessage message = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); |
| session.Commit(); |
| |
| message = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(secondMsgSend, message, "Second message does not match."); |
| |
| // Rollback so we can get that last message again. |
| session.Rollback(); |
| IMessage rollbackMsg = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(secondMsgSend, rollbackMsg, "Rollback message does not match."); |
| session.Commit(); |
| } |
| } |
| } |
| } |
| |
| |
| [RowTest] |
| [Row(MsgDeliveryMode.Persistent)] |
| [Row(MsgDeliveryMode.NonPersistent)] |
| public void TestReceiveTwoThenRollback(MsgDeliveryMode deliveryMode) |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next())) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) |
| { |
| IDestination destination = session.CreateTemporaryQueue(); |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) |
| using(IMessageProducer producer = session.CreateProducer(destination)) |
| { |
| producer.DeliveryMode = deliveryMode; |
| producer.RequestTimeout = receiveTimeout; |
| // Send both messages |
| ITextMessage firstMsgSend = session.CreateTextMessage("First Message"); |
| producer.Send(firstMsgSend); |
| ITextMessage secondMsgSend = session.CreateTextMessage("Second Message"); |
| producer.Send(secondMsgSend); |
| session.Commit(); |
| |
| // Receive the messages |
| |
| IMessage message = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); |
| message = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(secondMsgSend, message, "Second message does not match."); |
| |
| // Rollback so we can get that last two messages again. |
| session.Rollback(); |
| IMessage rollbackMsg = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(firstMsgSend, rollbackMsg, "First rollback message does not match."); |
| rollbackMsg = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(secondMsgSend, rollbackMsg, "Second rollback message does not match."); |
| |
| Assert.IsNull(consumer.ReceiveNoWait()); |
| session.Commit(); |
| } |
| } |
| } |
| } |
| |
| [RowTest] |
| [Row(AcknowledgementMode.AutoAcknowledge, MsgDeliveryMode.Persistent)] |
| [Row(AcknowledgementMode.AutoAcknowledge, MsgDeliveryMode.NonPersistent)] |
| [Row(AcknowledgementMode.ClientAcknowledge, MsgDeliveryMode.Persistent)] |
| [Row(AcknowledgementMode.ClientAcknowledge, MsgDeliveryMode.NonPersistent)] |
| public void TestSendCommitNonTransaction(AcknowledgementMode ackMode, MsgDeliveryMode deliveryMode) |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next())) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession(ackMode)) |
| { |
| IDestination destination = session.CreateTemporaryQueue(); |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) |
| using(IMessageProducer producer = session.CreateProducer(destination)) |
| { |
| producer.DeliveryMode = deliveryMode; |
| producer.RequestTimeout = receiveTimeout; |
| ITextMessage firstMsgSend = session.CreateTextMessage("SendCommitNonTransaction Message"); |
| producer.Send(firstMsgSend); |
| try |
| { |
| session.Commit(); |
| Assert.Fail("Should have thrown an InvalidOperationException."); |
| } |
| catch(InvalidOperationException) |
| { |
| } |
| } |
| } |
| } |
| } |
| |
| [RowTest] |
| [Row(AcknowledgementMode.AutoAcknowledge, MsgDeliveryMode.Persistent)] |
| [Row(AcknowledgementMode.AutoAcknowledge, MsgDeliveryMode.NonPersistent)] |
| [Row(AcknowledgementMode.ClientAcknowledge, MsgDeliveryMode.Persistent)] |
| [Row(AcknowledgementMode.ClientAcknowledge, MsgDeliveryMode.NonPersistent)] |
| public void TestReceiveCommitNonTransaction(AcknowledgementMode ackMode, MsgDeliveryMode deliveryMode) |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next())) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession(ackMode)) |
| { |
| IDestination destination = session.CreateTemporaryQueue(); |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) |
| using(IMessageProducer producer = session.CreateProducer(destination)) |
| { |
| producer.DeliveryMode = deliveryMode; |
| producer.RequestTimeout = receiveTimeout; |
| ITextMessage firstMsgSend = session.CreateTextMessage("ReceiveCommitNonTransaction Message"); |
| producer.Send(firstMsgSend); |
| |
| // Receive the messages |
| |
| IMessage message = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); |
| if(AcknowledgementMode.ClientAcknowledge == ackMode) |
| { |
| message.Acknowledge(); |
| } |
| |
| try |
| { |
| session.Commit(); |
| Assert.Fail("Should have thrown an InvalidOperationException."); |
| } |
| catch(InvalidOperationException) |
| { |
| } |
| } |
| } |
| } |
| } |
| |
| [RowTest] |
| [Row(AcknowledgementMode.AutoAcknowledge, MsgDeliveryMode.Persistent)] |
| [Row(AcknowledgementMode.AutoAcknowledge, MsgDeliveryMode.NonPersistent)] |
| [Row(AcknowledgementMode.ClientAcknowledge, MsgDeliveryMode.Persistent)] |
| [Row(AcknowledgementMode.ClientAcknowledge, MsgDeliveryMode.NonPersistent)] |
| public void TestReceiveRollbackNonTransaction(AcknowledgementMode ackMode, MsgDeliveryMode deliveryMode) |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next())) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession(ackMode)) |
| { |
| IDestination destination = session.CreateTemporaryQueue(); |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) |
| using(IMessageProducer producer = session.CreateProducer(destination)) |
| { |
| producer.DeliveryMode = deliveryMode; |
| producer.RequestTimeout = receiveTimeout; |
| ITextMessage firstMsgSend = session.CreateTextMessage("ReceiveCommitNonTransaction Message"); |
| producer.Send(firstMsgSend); |
| |
| // Receive the messages |
| |
| IMessage message = consumer.Receive(receiveTimeout); |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); |
| if(AcknowledgementMode.ClientAcknowledge == ackMode) |
| { |
| message.Acknowledge(); |
| } |
| |
| try |
| { |
| session.Rollback(); |
| Assert.Fail("Should have thrown an InvalidOperationException."); |
| } |
| catch(InvalidOperationException) |
| { |
| } |
| } |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Assert that two messages are ITextMessages and their text bodies are equal. |
| /// </summary> |
| /// <param name="expected"></param> |
| /// <param name="actual"></param> |
| /// <param name="message"></param> |
| protected void AssertTextMessageEqual(IMessage expected, IMessage actual, String message) |
| { |
| ITextMessage expectedTextMsg = expected as ITextMessage; |
| Assert.IsNotNull(expectedTextMsg, "'expected' message not a text message"); |
| ITextMessage actualTextMsg = actual as ITextMessage; |
| Assert.IsNotNull(actualTextMsg, "'actual' message not a text message"); |
| Assert.AreEqual(expectedTextMsg.Text, actualTextMsg.Text, message); |
| } |
| |
| [RowTest] |
| [Row(MsgDeliveryMode.Persistent)] |
| [Row(MsgDeliveryMode.NonPersistent)] |
| public void TestRedispatchOfRolledbackTx(MsgDeliveryMode deliveryMode) |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next())) |
| { |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| |
| SendMessages(connection, destination, deliveryMode, 2); |
| |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(1500))); |
| Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(1500))); |
| |
| // install another consumer while message dispatch is unacked/uncommitted |
| ISession redispatchSession = connection.CreateSession(AcknowledgementMode.Transactional); |
| IMessageConsumer redispatchConsumer = redispatchSession.CreateConsumer(destination); |
| |
| session.Rollback(); |
| session.Close(); |
| |
| IMessage msg = redispatchConsumer.Receive(TimeSpan.FromMilliseconds(1500)); |
| Assert.IsNotNull(msg); |
| Assert.IsTrue(msg.NMSRedelivered); |
| msg = redispatchConsumer.Receive(TimeSpan.FromMilliseconds(1500)); |
| Assert.IsNotNull(msg); |
| Assert.IsTrue(msg.NMSRedelivered); |
| redispatchSession.Commit(); |
| |
| Assert.IsNull(redispatchConsumer.Receive(TimeSpan.FromMilliseconds(500))); |
| redispatchSession.Close(); |
| } |
| } |
| |
| [RowTest] |
| [Row(MsgDeliveryMode.Persistent)] |
| [Row(MsgDeliveryMode.NonPersistent)] |
| public void TestRedispatchOfUncommittedTx(MsgDeliveryMode deliveryMode) |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next())) |
| { |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| |
| SendMessages(connection, destination, deliveryMode, 2); |
| |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(2000))); |
| Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(2000))); |
| |
| // install another consumer while message dispatch is unacked/uncommitted |
| ISession redispatchSession = connection.CreateSession(AcknowledgementMode.Transactional); |
| IMessageConsumer redispatchConsumer = redispatchSession.CreateConsumer(destination); |
| |
| // no commit so will auto rollback and get re-dispatched to redisptachConsumer |
| session.Close(); |
| |
| IMessage msg = redispatchConsumer.Receive(TimeSpan.FromMilliseconds(2000)); |
| Assert.IsNotNull(msg); |
| Assert.IsTrue(msg.NMSRedelivered); |
| |
| msg = redispatchConsumer.Receive(TimeSpan.FromMilliseconds(2000)); |
| Assert.IsNotNull(msg); |
| Assert.IsTrue(msg.NMSRedelivered); |
| redispatchSession.Commit(); |
| |
| Assert.IsNull(redispatchConsumer.Receive(TimeSpan.FromMilliseconds(500))); |
| redispatchSession.Close(); |
| } |
| } |
| } |
| } |
| |
| |