| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Collections; |
| using System.Data.SqlClient; |
| using System.IO; |
| using System.Threading; |
| using System.Transactions; |
| using Apache.NMS.ActiveMQ.Transport; |
| using Apache.NMS.ActiveMQ.Transport.Tcp; |
| using NUnit.Framework; |
| |
| namespace Apache.NMS.ActiveMQ.Test |
| { |
| [TestFixture] |
| [Category("Manual")] |
| class DtcConsumerTransactionsTest : DtcTransactionsTestSupport |
| { |
| [SetUp] |
| public override void SetUp() |
| { |
| base.SetUp(); |
| |
| this.dtcFactory = new NetTxConnectionFactory(ReplaceEnvVar(connectionUri)); |
| this.dtcFactory.ConfiguredResourceManagerId = Guid.NewGuid().ToString(); |
| } |
| |
| [Test] |
| public void TestRedelivered() |
| { |
| // enqueue several messages |
| PurgeDatabase(); |
| PurgeAndFillQueue(); |
| |
| // receive just one |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| connection.Start(); |
| |
| using (INetTxSession session = connection.CreateNetTxSession()) |
| { |
| IQueue queue = session.GetQueue(testQueueName); |
| |
| // read message from queue and insert into db table |
| using (IMessageConsumer consumer = session.CreateConsumer(queue)) |
| { |
| using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) |
| using (SqlConnection sqlConnection = new SqlConnection(createDbConnectionString)) |
| using (SqlCommand sqlInsertCommand = new SqlCommand()) |
| { |
| sqlConnection.Open(); |
| sqlInsertCommand.Connection = sqlConnection; |
| |
| ITextMessage message = consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage; |
| sqlInsertCommand.CommandText = |
| string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text)); |
| sqlInsertCommand.ExecuteNonQuery(); |
| |
| scoped.Complete(); |
| } |
| } |
| |
| session.Close(); |
| } |
| } |
| |
| // check that others message have status redelivered = false |
| IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionUri)); |
| |
| using (IConnection connection = checkFactory.CreateConnection()) |
| { |
| connection.Start(); |
| |
| using (ISession session = connection.CreateSession()) |
| using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName))) |
| { |
| IEnumerator enumerator = browser.GetEnumerator(); |
| |
| while (enumerator.MoveNext()) |
| { |
| IMessage msg = enumerator.Current as IMessage; |
| Assert.IsNotNull(msg, "message is not in the queue!"); |
| Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!"); |
| } |
| } |
| } |
| } |
| |
| [Test] |
| public void TestRedeliveredCase2() |
| { |
| const int messageCount = 300; |
| const int receiveCount = 150; |
| |
| // enqueue several messages |
| PurgeDatabase(); |
| PurgeAndFillQueue(messageCount); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| connection.Start(); |
| |
| // receive half of total messages |
| for (int i = 0; i < receiveCount; i++) |
| { |
| using (INetTxSession session = connection.CreateNetTxSession()) |
| { |
| IQueue queue = session.GetQueue(testQueueName); |
| |
| // read message from queue and insert into db table |
| using (IMessageConsumer consumer = session.CreateConsumer(queue)) |
| { |
| using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) |
| using (SqlConnection sqlConnection = new SqlConnection(createDbConnectionString)) |
| using (SqlCommand sqlInsertCommand = new SqlCommand()) |
| { |
| sqlConnection.Open(); |
| sqlInsertCommand.Connection = sqlConnection; |
| |
| ITextMessage message = |
| consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage; |
| sqlInsertCommand.CommandText = |
| string.Format("INSERT INTO {0} VALUES ({1})", testTable, |
| Convert.ToInt32(message.Text)); |
| sqlInsertCommand.ExecuteNonQuery(); |
| |
| scoped.Complete(); |
| } |
| } |
| |
| session.Close(); |
| } |
| |
| Tracer.Debug("Completed for loop iteration #" + i); |
| } |
| } |
| |
| // check that others message have status redelivered = false |
| IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionUri)); |
| |
| using (IConnection connection = checkFactory.CreateConnection()) |
| { |
| connection.Start(); |
| |
| using (ISession session = connection.CreateSession()) |
| using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName))) |
| { |
| IEnumerator enumerator = browser.GetEnumerator(); |
| |
| while (enumerator.MoveNext()) |
| { |
| IMessage msg = enumerator.Current as IMessage; |
| Assert.IsNotNull(msg, "message is not in the queue!"); |
| Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!"); |
| } |
| } |
| } |
| } |
| |
| [Test] |
| public void TestRedeliveredCase3() |
| { |
| const int messageCount = 300; |
| const int receiveCount = 150; |
| |
| // enqueue several messages |
| PurgeDatabase(); |
| PurgeAndFillQueue(messageCount); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| connection.Start(); |
| |
| // receive half of total messages |
| using (INetTxSession session = connection.CreateNetTxSession()) |
| { |
| IQueue queue = session.GetQueue(testQueueName); |
| |
| // read message from queue and insert into db table |
| using (IMessageConsumer consumer = session.CreateConsumer(queue)) |
| { |
| for (int i = 0; i < receiveCount; i++) |
| { |
| using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) |
| using (SqlConnection sqlConnection = new SqlConnection(createDbConnectionString)) |
| using (SqlCommand sqlInsertCommand = new SqlCommand()) |
| { |
| sqlConnection.Open(); |
| sqlInsertCommand.Connection = sqlConnection; |
| |
| ITextMessage message = |
| consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage; |
| sqlInsertCommand.CommandText = |
| string.Format("INSERT INTO {0} VALUES ({1})", testTable, |
| Convert.ToInt32(message.Text)); |
| sqlInsertCommand.ExecuteNonQuery(); |
| scoped.Complete(); |
| } |
| } |
| } |
| |
| session.Close(); |
| } |
| } |
| |
| Tracer.Debug("First stage ok"); |
| |
| // check that others message have status redelivered = false |
| IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionUri)); |
| |
| using (IConnection connection = checkFactory.CreateConnection()) |
| { |
| connection.Start(); |
| |
| using (ISession session = connection.CreateSession()) |
| using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName))) |
| { |
| IEnumerator enumerator = browser.GetEnumerator(); |
| |
| while (enumerator.MoveNext()) |
| { |
| IMessage msg = enumerator.Current as IMessage; |
| Assert.IsNotNull(msg, "message is not in the queue!"); |
| Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!"); |
| } |
| } |
| } |
| } |
| |
| [Test] |
| public void TestRedeliveredNoComplete() |
| { |
| const int messageCount = 300; |
| const int receiveCount = 150; |
| |
| // enqueue several messages |
| PurgeDatabase(); |
| PurgeAndFillQueue(messageCount); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| // allow no redelivery so that message immediatly goes to the DLQ if first read fails |
| connection.RedeliveryPolicy.MaximumRedeliveries = 0; |
| connection.Start(); |
| |
| // receive half of total messages |
| using (INetTxSession session = connection.CreateNetTxSession()) |
| { |
| IQueue queue = session.GetQueue(testQueueName); |
| |
| // read message from queue and insert into db table |
| using (IMessageConsumer consumer = session.CreateConsumer(queue)) |
| { |
| for (int i = 0; i < receiveCount; i++) |
| { |
| using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) |
| using (SqlConnection sqlConnection = new SqlConnection(createDbConnectionString)) |
| using (SqlCommand sqlInsertCommand = new SqlCommand()) |
| { |
| sqlConnection.Open(); |
| sqlInsertCommand.Connection = sqlConnection; |
| |
| ITextMessage message = |
| consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage; |
| sqlInsertCommand.CommandText = |
| string.Format("INSERT INTO {0} VALUES ({1})", testTable, |
| Convert.ToInt32(message.Text)); |
| sqlInsertCommand.ExecuteNonQuery(); |
| } |
| } |
| } |
| |
| session.Close(); |
| } |
| } |
| |
| Tracer.Debug("First stage ok"); |
| |
| // check that others message have status redelivered = false |
| IConnectionFactory checkFactory = new ConnectionFactory(ReplaceEnvVar(connectionUri)); |
| |
| using (IConnection connection = checkFactory.CreateConnection()) |
| { |
| connection.Start(); |
| |
| using (ISession session = connection.CreateSession()) |
| using (IQueueBrowser browser = session.CreateBrowser(session.GetQueue(testQueueName))) |
| { |
| IEnumerator enumerator = browser.GetEnumerator(); |
| while (enumerator.MoveNext()) |
| { |
| IMessage msg = enumerator.Current as IMessage; |
| Assert.IsNotNull(msg, "message is not in the queue!"); |
| Assert.IsFalse(msg.NMSRedelivered, "message is redelivered!"); |
| } |
| } |
| } |
| } |
| |
| [Test] |
| public void TestRecoveryAfterCommitFailsBeforeSent() |
| { |
| // Test initialize - Fills in queue with data to send and clears the DB. |
| PurgeDatabase(); |
| PurgeAndFillQueue(); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| connection.ExceptionListener += this.OnException; |
| connection.Start(); |
| |
| ITransport transport = (connection as Connection).ITransport; |
| TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; |
| Assert.IsNotNull(tcpFaulty); |
| tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook; |
| |
| ReadFromQueueAndInsertIntoDbWithCommit(connection); |
| |
| Thread.Sleep(1000); |
| } |
| |
| // transaction should not have been commited |
| VerifyNoMessagesInQueueNoRecovery(); |
| |
| // verify sql server has commited the transaction |
| VerifyDatabaseTableIsFull(); |
| |
| // check messages are not present in the queue |
| VerifyNoMessagesInQueue(); |
| } |
| |
| [Test] |
| public void TestRecoveryAfterCommitFailsAfterSent() |
| { |
| // Test initialize - Fills in queue with data to send and clears the DB. |
| PurgeDatabase(); |
| PurgeAndFillQueue(); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| connection.ExceptionListener += this.OnException; |
| connection.Start(); |
| |
| ITransport transport = (connection as Connection).ITransport; |
| TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; |
| Assert.IsNotNull(tcpFaulty); |
| tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook; |
| |
| ReadFromQueueAndInsertIntoDbWithCommit(connection); |
| |
| Thread.Sleep(1000); |
| } |
| |
| // transaction should have been commited |
| VerifyNoMessagesInQueueNoRecovery(); |
| |
| // verify sql server has commited the transaction |
| VerifyDatabaseTableIsFull(); |
| |
| // check messages are not present in the queue |
| VerifyNoMessagesInQueue(); |
| } |
| |
| [Test] |
| public void TestIterativeTransactedConsume() |
| { |
| // Test initialize - Fills in queue with data to send and clears the DB. |
| PurgeDatabase(); |
| PurgeAndFillQueue(5 * MSG_COUNT); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| connection.ExceptionListener += this.OnException; |
| connection.Start(); |
| |
| ReadFromQueueAndInsertIntoDbWithCommit(connection); |
| ReadFromQueueAndInsertIntoDbWithCommit(connection); |
| ReadFromQueueAndInsertIntoDbWithCommit(connection); |
| ReadFromQueueAndInsertIntoDbWithCommit(connection); |
| ReadFromQueueAndInsertIntoDbWithCommit(connection); |
| |
| Thread.Sleep(2000); |
| } |
| |
| // verify sql server has commited the transaction |
| VerifyDatabaseTableIsFull(5 * MSG_COUNT); |
| |
| // check messages are NOT present in the queue |
| VerifyNoMessagesInQueueNoRecovery(); |
| } |
| |
| [Test] |
| public void TestConsumeWithDBInsertLogLocation() |
| { |
| const string logLocation = @".\RecoveryDir"; |
| string newConnectionUri = |
| connectionUri + "?nms.RecoveryPolicy.RecoveryLogger.Location=" + logLocation + |
| "&nms.configuredResourceManagerId=" + |
| dtcFactory.ConfiguredResourceManagerId; |
| |
| // Test initialize - Fills in queue with data to send and clears the DB. |
| PurgeDatabase(); |
| PurgeAndFillQueue(); |
| |
| if (Directory.Exists(logLocation)) |
| { |
| Directory.Delete(logLocation, true); |
| } |
| |
| Directory.CreateDirectory(logLocation); |
| |
| dtcFactory = new NetTxConnectionFactory(ReplaceEnvVar(newConnectionUri)); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| connection.ExceptionListener += this.OnException; |
| connection.Start(); |
| |
| ITransport transport = (connection as Connection).ITransport; |
| TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; |
| Assert.IsNotNull(tcpFaulty); |
| tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook; |
| |
| ReadFromQueueAndInsertIntoDbWithCommit(connection); |
| |
| Thread.Sleep(2000); |
| } |
| |
| Assert.AreEqual(1, Directory.GetFiles(logLocation).Length); |
| |
| // verify sql server has commited the transaction |
| VerifyDatabaseTableIsFull(); |
| |
| // check messages are NOT present in the queue |
| NetTxTransactionContext.ResetDtcRecovery(); |
| VerifyBrokerQueueCount(0, newConnectionUri); |
| |
| Assert.AreEqual(0, Directory.GetFiles(logLocation).Length); |
| } |
| |
| [Test] |
| public void TestRecoverAfterTransactionScopeAborted() |
| { |
| // Test initialize - Fills in queue with data to send and clears the DB. |
| PurgeDatabase(); |
| PurgeAndFillQueue(); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| connection.ExceptionListener += this.OnException; |
| connection.Start(); |
| |
| ReadFromQueueAndInsertIntoDbWithScopeAborted(connection); |
| |
| Thread.Sleep(2000); |
| } |
| |
| // verify sql server has NOT commited the transaction |
| VerifyDatabaseTableIsEmpty(); |
| |
| // check messages are present in the queue |
| NetTxTransactionContext.ResetDtcRecovery(); |
| VerifyBrokerQueueCount(); |
| } |
| |
| [Test] |
| public void TestRecoverAfterRollbackFailWhenScopeAborted() |
| { |
| // Test initialize - Fills in queue with data to send and clears the DB. |
| PurgeDatabase(); |
| PurgeAndFillQueue(); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| connection.ExceptionListener += this.OnException; |
| connection.Start(); |
| |
| ITransport transport = (connection as Connection).ITransport; |
| TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; |
| Assert.IsNotNull(tcpFaulty); |
| tcpFaulty.OnewayCommandPreProcessor += this.FailOnRollbackTransportHook; |
| |
| ReadFromQueueAndInsertIntoDbWithScopeAborted(connection); |
| |
| Thread.Sleep(2000); |
| } |
| |
| // verify sql server has NOT commited the transaction |
| VerifyDatabaseTableIsEmpty(); |
| |
| // check messages are recovered and present in the queue |
| NetTxTransactionContext.ResetDtcRecovery(); |
| VerifyBrokerQueueCount(); |
| } |
| |
| [Test] |
| public void TestRecoverAfterFailOnTransactionBeforePrepareSent() |
| { |
| // Test initialize - Fills in queue with data to send and clears the DB. |
| PurgeDatabase(); |
| PurgeAndFillQueue(); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| ITransport transport = (connection as Connection).ITransport; |
| TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; |
| Assert.IsNotNull(tcpFaulty); |
| tcpFaulty.OnewayCommandPreProcessor += this.FailOnPrepareTransportHook; |
| |
| connection.ExceptionListener += this.OnException; |
| connection.Start(); |
| |
| ReadFromQueueAndInsertIntoDbWithCommit(connection); |
| |
| Thread.Sleep(2000); |
| } |
| |
| // Messages are visible since no prepare sent |
| VerifyBrokerQueueCountNoRecovery(); |
| |
| // verify sql server has NOT commited the transaction |
| VerifyDatabaseTableIsEmpty(); |
| |
| // check messages are present in the queue |
| NetTxTransactionContext.ResetDtcRecovery(); |
| VerifyBrokerQueueCount(); |
| } |
| |
| [Test] |
| public void TestRecoverAfterFailOnTransactionAfterPrepareSent() |
| { |
| // Test initialize - Fills in queue with data to send and clears the DB. |
| PurgeDatabase(); |
| PurgeAndFillQueue(); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| ITransport transport = (connection as Connection).ITransport; |
| TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport; |
| Assert.IsNotNull(tcpFaulty); |
| tcpFaulty.OnewayCommandPostProcessor += this.FailOnPrepareTransportHook; |
| |
| connection.ExceptionListener += this.OnException; |
| connection.Start(); |
| |
| ReadFromQueueAndInsertIntoDbWithCommit(connection); |
| |
| Thread.Sleep(2000); |
| } |
| |
| // not visible yet because it must be rolled back |
| VerifyNoMessagesInQueueNoRecovery(); |
| |
| // verify sql server has NOT commited the transaction |
| VerifyDatabaseTableIsEmpty(); |
| |
| // check messages are present in the queue |
| NetTxTransactionContext.ResetDtcRecovery(); |
| VerifyBrokerQueueCount(); |
| } |
| |
| [Test] |
| public void MessageShouldEnlistToTheCorrectTransaction() |
| { |
| const int messageCount = 100; |
| const int receiveCount = 100; |
| |
| // enqueue several messages |
| PurgeDatabase(); |
| PurgeAndFillQueue(messageCount); |
| |
| var enlistment = new TestSinglePhaseCommit(); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| { |
| connection.Start(); |
| |
| // receive half of total messages |
| using (INetTxSession session = connection.CreateNetTxSession()) |
| { |
| IQueue queue = session.GetQueue(testQueueName); |
| using (IMessageConsumer consumer = session.CreateConsumer(queue)) |
| { |
| for (int i = 0; i < receiveCount; i++) |
| { |
| try |
| { |
| using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) |
| { |
| ITextMessage message = consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage; |
| |
| Transaction.Current.EnlistDurable(Guid.NewGuid(), enlistment, EnlistmentOptions.None); |
| if (new Random().Next(2) == 0) |
| { |
| Tracer.InfoFormat("Throwing random Exception for Message {0}", message.NMSMessageId); |
| throw new Exception(); |
| } |
| |
| scoped.Complete(); |
| } |
| } |
| catch |
| { |
| } |
| |
| Assert.False(enlistment.singlePhaseCommit, "No single phase commit should happen."); |
| } |
| } |
| } |
| } |
| } |
| |
| internal class TestSinglePhaseCommit : ISinglePhaseNotification |
| { |
| public bool singlePhaseCommit = false; |
| |
| public void Prepare(PreparingEnlistment preparingEnlistment) |
| { |
| preparingEnlistment.Prepared(); |
| } |
| public void Commit(Enlistment enlistment) |
| { |
| enlistment.Done(); |
| } |
| |
| public void Rollback(Enlistment enlistment) |
| { |
| enlistment.Done(); |
| } |
| public void InDoubt(Enlistment enlistment) |
| { |
| enlistment.Done(); |
| } |
| public void SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment) |
| { |
| Tracer.Info("Performing invalid single phase commit."); |
| singlePhaseCommit = true; |
| singlePhaseEnlistment.Committed(); |
| } |
| } |
| |
| #region Asynchronous Consumer Inside of a Transaction Test / Example |
| |
| private const int BATCH_COUNT = 5; |
| private int batchSequence; |
| private DependentTransaction batchTxControl; |
| private readonly ManualResetEvent awaitBatchProcessingStart = new ManualResetEvent(false); |
| |
| [Test] |
| public void TestTransactedAsyncConsumption() |
| { |
| PurgeDatabase(); |
| PurgeAndFillQueue(MSG_COUNT * BATCH_COUNT); |
| |
| using (INetTxConnection connection = dtcFactory.CreateNetTxConnection()) |
| using (NetTxSession session = connection.CreateNetTxSession() as NetTxSession) |
| { |
| IQueue queue = session.GetQueue(testQueueName); |
| IMessageConsumer consumer = session.CreateConsumer(queue); |
| consumer.Listener += AsyncTxAwareOnMessage; |
| |
| // Be carefull, message are dispatched once this is done, so you could receive |
| // a Message outside a TX. We use the awaitBatchProcessingStart event here to |
| // gate te OnMessage callback, once that method returns the Message is ack'd and |
| // no longer has a chance to participate in a TX. |
| connection.Start(); |
| |
| for (int i = 0; i < BATCH_COUNT; ++i) |
| { |
| using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew)) |
| { |
| session.Enlist(Transaction.Current); |
| |
| batchTxControl = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete); |
| awaitBatchProcessingStart.Set(); |
| scoped.Complete(); |
| } |
| |
| // Reenlisting to fast seems to annoy the DTC. Also since DTC operations are |
| // async we need to allow a little time for lag so that the last TX actually |
| // completes before we start a new one. |
| Thread.Sleep(250); |
| } |
| } |
| |
| // verify sql server has commited the transaction |
| VerifyDatabaseTableIsFull(MSG_COUNT * BATCH_COUNT); |
| |
| // check messages are NOT present in the queue |
| VerifyNoMessagesInQueue(); |
| } |
| |
| private void AsyncTxAwareOnMessage(IMessage message) |
| { |
| awaitBatchProcessingStart.WaitOne(); |
| |
| try |
| { |
| using (TransactionScope scoped = new TransactionScope(batchTxControl)) |
| using (SqlConnection sqlConnection = new SqlConnection(createDbConnectionString)) |
| using (SqlCommand sqlInsertCommand = new SqlCommand()) |
| { |
| sqlConnection.Open(); |
| sqlInsertCommand.Connection = sqlConnection; |
| |
| ITextMessage textMessage = message as ITextMessage; |
| Assert.IsNotNull(message, "missing message"); |
| sqlInsertCommand.CommandText = |
| string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(textMessage.Text)); |
| sqlInsertCommand.ExecuteNonQuery(); |
| scoped.Complete(); |
| } |
| |
| if(++batchSequence == MSG_COUNT) |
| { |
| batchSequence = 0; |
| awaitBatchProcessingStart.Reset(); |
| batchTxControl.Complete(); |
| } |
| } |
| catch (Exception e) |
| { |
| Tracer.Debug("TX;Error from TransactionScope: " + e.Message); |
| Tracer.Debug(e.ToString()); |
| } |
| } |
| |
| #endregion |
| } |
| } |