| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| using System; | |
| using Apache.NMS.Util; | |
| using NUnit.Framework; | |
| namespace Apache.NMS.Test | |
| { | |
| //[TestFixture] | |
| public class TransactionTest : NMSTest | |
| { | |
| protected TransactionTest(NMSTestSupport testSupport) | |
| : base(testSupport) | |
| { | |
| } | |
| //[Test] | |
| public virtual void TestSendRollback( | |
| //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] | |
| MsgDeliveryMode deliveryMode, string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection(GetTestClientId())) | |
| { | |
| connection.Start(); | |
| using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) | |
| { | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
| using(IMessageProducer producer = session.CreateProducer(destination)) | |
| { | |
| producer.DeliveryMode = deliveryMode; | |
| ITextMessage firstMsgSend = session.CreateTextMessage("First Message"); | |
| producer.Send(firstMsgSend); | |
| session.Commit(); | |
| ITextMessage rollbackMsg = session.CreateTextMessage("I'm going to get rolled back."); | |
| producer.Send(rollbackMsg); | |
| session.Rollback(); | |
| ITextMessage secondMsgSend = session.CreateTextMessage("Second Message"); | |
| producer.Send(secondMsgSend); | |
| session.Commit(); | |
| // Receive the messages | |
| IMessage message = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); | |
| message = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(secondMsgSend, message, "Second message does not match."); | |
| // validates that the rollback was not consumed | |
| session.Commit(); | |
| } | |
| } | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestSendSessionClose( | |
| //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] | |
| MsgDeliveryMode deliveryMode, string testQueueRef) | |
| { | |
| ITextMessage firstMsgSend; | |
| ITextMessage secondMsgSend; | |
| using(IConnection connection1 = CreateConnection(GetTestClientId())) | |
| { | |
| connection1.Start(); | |
| using(ISession session1 = connection1.CreateSession(AcknowledgementMode.Transactional)) | |
| { | |
| IDestination destination1 = GetClearDestination(session1, DestinationType.Queue, testQueueRef); | |
| using(IMessageConsumer consumer = session1.CreateConsumer(destination1)) | |
| { | |
| // First connection session that sends one message, and the | |
| // second message is implicitly rolled back as the session is | |
| // disposed before Commit() can be called. | |
| using(IConnection connection2 = CreateConnection(GetTestClientId())) | |
| { | |
| connection2.Start(); | |
| using(ISession session2 = connection2.CreateSession(AcknowledgementMode.Transactional)) | |
| { | |
| IDestination destination2 = GetClearDestination(session2, DestinationType.Queue, testQueueRef); | |
| using(IMessageProducer producer = session2.CreateProducer(destination2)) | |
| { | |
| producer.DeliveryMode = deliveryMode; | |
| firstMsgSend = session2.CreateTextMessage("First Message"); | |
| producer.Send(firstMsgSend); | |
| session2.Commit(); | |
| ITextMessage rollbackMsg = session2.CreateTextMessage("I'm going to get rolled back."); | |
| producer.Send(rollbackMsg); | |
| } | |
| } | |
| } | |
| // Second connection session that will send one message. | |
| using(IConnection connection2 = CreateConnection(GetTestClientId())) | |
| { | |
| connection2.Start(); | |
| using(ISession session2 = connection2.CreateSession(AcknowledgementMode.Transactional)) | |
| { | |
| IDestination destination2 = GetClearDestination(session2, DestinationType.Queue, testQueueRef); | |
| using(IMessageProducer producer = session2.CreateProducer(destination2)) | |
| { | |
| producer.DeliveryMode = deliveryMode; | |
| secondMsgSend = session2.CreateTextMessage("Second Message"); | |
| producer.Send(secondMsgSend); | |
| session2.Commit(); | |
| } | |
| } | |
| } | |
| // Check the consumer to verify which messages were actually received. | |
| IMessage message = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); | |
| message = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(secondMsgSend, message, "Second message does not match."); | |
| // validates that the rollback was not consumed | |
| session1.Commit(); | |
| } | |
| } | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestReceiveRollback( | |
| //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] | |
| MsgDeliveryMode deliveryMode, string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection(GetTestClientId())) | |
| { | |
| connection.Start(); | |
| using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) | |
| { | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
| using(IMessageProducer producer = session.CreateProducer(destination)) | |
| { | |
| producer.DeliveryMode = deliveryMode; | |
| // Send both messages | |
| ITextMessage firstMsgSend = session.CreateTextMessage("First Message"); | |
| producer.Send(firstMsgSend); | |
| ITextMessage secondMsgSend = session.CreateTextMessage("Second Message"); | |
| producer.Send(secondMsgSend); | |
| session.Commit(); | |
| // Receive the messages | |
| IMessage message = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); | |
| session.Commit(); | |
| message = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(secondMsgSend, message, "Second message does not match."); | |
| // Rollback so we can get that last message again. | |
| session.Rollback(); | |
| IMessage rollbackMsg = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(secondMsgSend, rollbackMsg, "Rollback message does not match."); | |
| session.Commit(); | |
| } | |
| } | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestReceiveTwoThenRollback( | |
| //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] | |
| MsgDeliveryMode deliveryMode, string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection(GetTestClientId())) | |
| { | |
| connection.Start(); | |
| using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional)) | |
| { | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
| using(IMessageProducer producer = session.CreateProducer(destination)) | |
| { | |
| producer.DeliveryMode = deliveryMode; | |
| // Send both messages | |
| ITextMessage firstMsgSend = session.CreateTextMessage("First Message"); | |
| producer.Send(firstMsgSend); | |
| ITextMessage secondMsgSend = session.CreateTextMessage("Second Message"); | |
| producer.Send(secondMsgSend); | |
| session.Commit(); | |
| // Receive the messages | |
| IMessage message = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); | |
| message = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(secondMsgSend, message, "Second message does not match."); | |
| // Rollback so we can get that last two messages again. | |
| session.Rollback(); | |
| IMessage rollbackMsg = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(firstMsgSend, rollbackMsg, "First rollback message does not match."); | |
| rollbackMsg = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(secondMsgSend, rollbackMsg, "Second rollback message does not match."); | |
| Assert.IsNull(consumer.ReceiveNoWait()); | |
| session.Commit(); | |
| } | |
| } | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestSendCommitNonTransaction( | |
| //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge)] | |
| AcknowledgementMode ackMode, | |
| //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] | |
| MsgDeliveryMode deliveryMode, string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection(GetTestClientId())) | |
| { | |
| connection.Start(); | |
| using(ISession session = connection.CreateSession(ackMode)) | |
| { | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
| using(IMessageProducer producer = session.CreateProducer(destination)) | |
| { | |
| producer.DeliveryMode = deliveryMode; | |
| ITextMessage firstMsgSend = session.CreateTextMessage("SendCommitNonTransaction Message"); | |
| producer.Send(firstMsgSend); | |
| try | |
| { | |
| session.Commit(); | |
| Assert.Fail("Should have thrown an InvalidOperationException."); | |
| } | |
| catch(InvalidOperationException) | |
| { | |
| } | |
| } | |
| } | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestReceiveCommitNonTransaction( | |
| //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge)] | |
| AcknowledgementMode ackMode, | |
| //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] | |
| MsgDeliveryMode deliveryMode, string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection(GetTestClientId())) | |
| { | |
| connection.Start(); | |
| using(ISession session = connection.CreateSession(ackMode)) | |
| { | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
| using(IMessageProducer producer = session.CreateProducer(destination)) | |
| { | |
| producer.DeliveryMode = deliveryMode; | |
| ITextMessage firstMsgSend = session.CreateTextMessage("ReceiveCommitNonTransaction Message"); | |
| producer.Send(firstMsgSend); | |
| // Receive the messages | |
| IMessage message = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); | |
| if(AcknowledgementMode.ClientAcknowledge == ackMode) | |
| { | |
| message.Acknowledge(); | |
| } | |
| try | |
| { | |
| session.Commit(); | |
| Assert.Fail("Should have thrown an InvalidOperationException."); | |
| } | |
| catch(InvalidOperationException) | |
| { | |
| } | |
| } | |
| } | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestReceiveRollbackNonTransaction( | |
| //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge)] | |
| AcknowledgementMode ackMode, | |
| //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] | |
| MsgDeliveryMode deliveryMode, string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection(GetTestClientId())) | |
| { | |
| connection.Start(); | |
| using(ISession session = connection.CreateSession(ackMode)) | |
| { | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
| using(IMessageProducer producer = session.CreateProducer(destination)) | |
| { | |
| producer.DeliveryMode = deliveryMode; | |
| ITextMessage firstMsgSend = session.CreateTextMessage("ReceiveCommitNonTransaction Message"); | |
| producer.Send(firstMsgSend); | |
| // Receive the messages | |
| IMessage message = consumer.Receive(receiveTimeout); | |
| AssertTextMessageEqual(firstMsgSend, message, "First message does not match."); | |
| if(AcknowledgementMode.ClientAcknowledge == ackMode) | |
| { | |
| message.Acknowledge(); | |
| } | |
| try | |
| { | |
| session.Rollback(); | |
| Assert.Fail("Should have thrown an InvalidOperationException."); | |
| } | |
| catch(InvalidOperationException) | |
| { | |
| } | |
| } | |
| } | |
| } | |
| } | |
| /// <summary> | |
| /// Assert that two messages are ITextMessages and their text bodies are equal. | |
| /// </summary> | |
| /// <param name="expected"></param> | |
| /// <param name="actual"></param> | |
| /// <param name="message"></param> | |
| protected void AssertTextMessageEqual(IMessage expected, IMessage actual, String message) | |
| { | |
| ITextMessage expectedTextMsg = expected as ITextMessage; | |
| Assert.IsNotNull(expectedTextMsg, "'expected' message not a text message"); | |
| ITextMessage actualTextMsg = actual as ITextMessage; | |
| Assert.IsNotNull(actualTextMsg, "'actual' message not a text message"); | |
| Assert.AreEqual(expectedTextMsg.Text, actualTextMsg.Text, message); | |
| } | |
| //[Test] | |
| public virtual void TestRedispatchOfRolledbackTx( | |
| //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] | |
| MsgDeliveryMode deliveryMode, string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection(GetTestClientId())) | |
| { | |
| connection.Start(); | |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| SendMessages(connection, destination, deliveryMode, 2); | |
| IMessageConsumer consumer = session.CreateConsumer(destination); | |
| Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(1500))); | |
| Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(1500))); | |
| // install another consumer while message dispatch is unacked/uncommitted | |
| ISession redispatchSession = connection.CreateSession(AcknowledgementMode.Transactional); | |
| IMessageConsumer redispatchConsumer = redispatchSession.CreateConsumer(destination); | |
| session.Rollback(); | |
| session.Close(); | |
| IMessage msg = redispatchConsumer.Receive(TimeSpan.FromMilliseconds(1500)); | |
| Assert.IsNotNull(msg); | |
| Assert.IsTrue(msg.NMSRedelivered); | |
| Assert.AreEqual(2, msg.Properties.GetLong("NMSXDeliveryCount")); | |
| msg = redispatchConsumer.Receive(TimeSpan.FromMilliseconds(1500)); | |
| Assert.IsNotNull(msg); | |
| Assert.IsTrue(msg.NMSRedelivered); | |
| Assert.AreEqual(2, msg.Properties.GetLong("NMSXDeliveryCount")); | |
| redispatchSession.Commit(); | |
| Assert.IsNull(redispatchConsumer.Receive(TimeSpan.FromMilliseconds(500))); | |
| redispatchSession.Close(); | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestRedispatchOfUncommittedTx( | |
| //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] | |
| MsgDeliveryMode deliveryMode, string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection(GetTestClientId())) | |
| { | |
| connection.Start(); | |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| SendMessages(connection, destination, deliveryMode, 2); | |
| IMessageConsumer consumer = session.CreateConsumer(destination); | |
| Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(2000))); | |
| Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(2000))); | |
| // install another consumer while message dispatch is unacked/uncommitted | |
| ISession redispatchSession = connection.CreateSession(AcknowledgementMode.Transactional); | |
| IMessageConsumer redispatchConsumer = redispatchSession.CreateConsumer(destination); | |
| // no commit so will auto rollback and get re-dispatched to redisptachConsumer | |
| session.Close(); | |
| IMessage msg = redispatchConsumer.Receive(TimeSpan.FromMilliseconds(2000)); | |
| Assert.IsNotNull(msg); | |
| Assert.IsTrue(msg.NMSRedelivered); | |
| Assert.AreEqual(2, msg.Properties.GetLong("NMSXDeliveryCount")); | |
| msg = redispatchConsumer.Receive(TimeSpan.FromMilliseconds(2000)); | |
| Assert.IsNotNull(msg); | |
| Assert.IsTrue(msg.NMSRedelivered); | |
| Assert.AreEqual(2, msg.Properties.GetLong("NMSXDeliveryCount")); | |
| redispatchSession.Commit(); | |
| Assert.IsNull(redispatchConsumer.Receive(TimeSpan.FromMilliseconds(500))); | |
| redispatchSession.Close(); | |
| } | |
| } | |
| } | |
| } | |