Apply patch from Jose Alvarado. Thanks, Jose!
Fixes [AMQNET-503]. (See https://issues.apache.org/jira/browse/AMQNET-503)
diff --git a/src/main/csharp/NetTxConnection.cs b/src/main/csharp/NetTxConnection.cs
index bc174b7..8f1976c 100644
--- a/src/main/csharp/NetTxConnection.cs
+++ b/src/main/csharp/NetTxConnection.cs
@@ -51,6 +51,21 @@
return session;
}
+ public INetTxSession CreateNetTxSession(Transaction tx, bool enlistNativeMsDtcResource)
+ {
+ NetTxSession session = (NetTxSession)CreateSession(AcknowledgementMode.Transactional);
+ session.Enlist(tx);
+ session.EnlistsMsDtcNativeResource = enlistNativeMsDtcResource;
+ return session;
+ }
+
+ public INetTxSession CreateNetTxSession(bool enlistNativeMsDtcResource)
+ {
+ NetTxSession session = (NetTxSession)CreateSession(AcknowledgementMode.Transactional);
+ session.EnlistsMsDtcNativeResource = enlistNativeMsDtcResource;
+ return session;
+ }
+
protected override Session CreateActiveMQSession(AcknowledgementMode ackMode)
{
CheckConnected();
diff --git a/src/main/csharp/NetTxMessageConsumer.cs b/src/main/csharp/NetTxMessageConsumer.cs
index 6a9a65c..f4ef109 100644
--- a/src/main/csharp/NetTxMessageConsumer.cs
+++ b/src/main/csharp/NetTxMessageConsumer.cs
@@ -18,6 +18,7 @@
using System;
using System.Collections.Generic;
using System.Text;
+using System.Transactions;
using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ
@@ -75,9 +76,29 @@
// distributed TX manager we need to wait whenever the TX is being
// controlled by the DTC as it completes all operations async and
// we cannot start consumption again until all its tasks have completed.)
- waitForDtcWaitHandle = this.transactionContext.InNetTransaction &&
- this.transactionContext.NetTxState ==
- NetTxTransactionContext.TxState.Pending;
+ var currentTransactionId = transactionContext.TransactionId as XATransactionId;
+ string currentLocalTxId = currentTransactionId != null
+ ? UTF8Encoding.UTF8.GetString(currentTransactionId.GlobalTransactionId)
+ : "NONE";
+
+ if (Transaction.Current != null)
+ {
+ waitForDtcWaitHandle = this.transactionContext.InNetTransaction &&
+ this.transactionContext.NetTxState == NetTxTransactionContext.TxState.Pending ||
+ currentLocalTxId != Transaction.Current.TransactionInformation.LocalIdentifier;
+ }
+ else
+ {
+ waitForDtcWaitHandle = this.transactionContext.InNetTransaction &&
+ this.transactionContext.NetTxState == NetTxTransactionContext.TxState.Pending;
+ }
+
+ }
+
+ //if session EnlistMsDtcNativeResource the transaction does not need to wait
+ if (this.session.EnlistsMsDtcNativeResource)
+ {
+ waitForDtcWaitHandle = false;
}
if (waitForDtcWaitHandle)
diff --git a/src/main/csharp/NetTxSession.cs b/src/main/csharp/NetTxSession.cs
index 8da035b..f7e754b 100644
--- a/src/main/csharp/NetTxSession.cs
+++ b/src/main/csharp/NetTxSession.cs
@@ -32,6 +32,7 @@
{
this.transactionContext = TransactionContext as NetTxTransactionContext;
this.transactionContext.InitializeDtcTxContext();
+ this.enlistMsDtcNativeResources = false;
}
/// <summary>
@@ -51,6 +52,14 @@
this.EnrollInSpecifiedTransaction(tx);
}
+ private bool enlistMsDtcNativeResources;
+
+ public bool EnlistsMsDtcNativeResource
+ {
+ get { return enlistMsDtcNativeResources; }
+ set { enlistMsDtcNativeResources = value; }
+ }
+
/// <summary>
/// Reports Transacted whenever there is an Ambient Transaction or the internal
/// TransactionContext is still involed in a .NET Transaction beyond the lifetime
@@ -173,6 +182,7 @@
this.currentTransactionId = tx.TransactionInformation.LocalIdentifier;
transactionContext.Begin(tx);
}
+
}
}
diff --git a/src/test/csharp/DtcTransactionsTestSupport.cs b/src/test/csharp/DtcTransactionsTestSupport.cs
index 2ce121f..030a9d3 100644
--- a/src/test/csharp/DtcTransactionsTestSupport.cs
+++ b/src/test/csharp/DtcTransactionsTestSupport.cs
@@ -53,7 +53,8 @@
private ITrace oldTracer;
protected const string sqlConnectionString =
- "Data Source=localhost;Initial Catalog=TestDB;User ID=user;Password=password";
+ // "Data Source=localhost;Initial Catalog=TestDB;User ID=user;Password=password";
+ "Data Source=.\\SQLEXPRESS;Initial Catalog=TestDB;Integrated Security = true";
protected const string testTable = "TestTable";
protected const string testColumn = "TestID";
protected const string testQueueName = "TestQueue";
@@ -484,7 +485,7 @@
{
IList entries = ExtractDataSet();
- using (INetTxSession session = connection.CreateNetTxSession())
+ using (INetTxSession session = connection.CreateNetTxSession(true))
{
IQueue queue = session.GetQueue(testQueueName);
@@ -531,7 +532,7 @@
{
IList entries = ExtractDataSet();
- using (INetTxSession session = connection.CreateNetTxSession())
+ using (INetTxSession session = connection.CreateNetTxSession(true))
{
IQueue queue = session.GetQueue(testQueueName);
@@ -578,7 +579,7 @@
protected static void ReadFromQueueAndInsertIntoDbWithCommit(INetTxConnection connection)
{
- using (INetTxSession session = connection.CreateNetTxSession())
+ using (INetTxSession session = connection.CreateNetTxSession(true))
{
IQueue queue = session.GetQueue(testQueueName);
@@ -619,7 +620,7 @@
protected static void ReadFromQueueAndInsertIntoDbWithScopeAborted(INetTxConnection connection)
{
- using (INetTxSession session = connection.CreateNetTxSession())
+ using (INetTxSession session = connection.CreateNetTxSession(true))
{
IQueue queue = session.GetQueue(testQueueName);