apply fix for: https://issues.apache.org/jira/browse/AMQNET-421
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index daa874b..3f18ba3 100755
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -39,11 +39,11 @@
/// </summary>
public class MessageConsumer : IMessageConsumer, IDispatcher
{
- private readonly MessageTransformation messageTransformation;
- private readonly MessageDispatchChannel unconsumedMessages;
- private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
- private readonly ConsumerInfo info;
- private readonly Session session;
+ private readonly MessageTransformation messageTransformation;
+ private readonly MessageDispatchChannel unconsumedMessages;
+ private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
+ private readonly ConsumerInfo info;
+ private readonly Session session;
private MessageAck pendingAck = null;
@@ -434,7 +434,7 @@
disposed = true;
}
- public void Close()
+ public virtual void Close()
{
if(!this.unconsumedMessages.Closed)
{
@@ -1029,25 +1029,25 @@
{
this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId;
- if(!IsAutoAcknowledgeBatch)
+ if (!IsAutoAcknowledgeBatch)
{
if (this.session.IsTransacted)
{
- this.session.TransactionContext.SyncRoot.WaitOne();
-
- // In the case where the consumer is operating in concert with a
- // distributed TX manager we need to wait whenever the TX is being
- // controlled by the DTC as it completes all operations async and
- // we cannot start consumption again until all its tasks have completed.)
- if (this.session.TransactionContext.InNetTransaction &&
- this.session.TransactionContext.NetTxState == TransactionContext.TxState.Pending)
+ bool waitForDtcWaitHandle = false;
+ lock (this.session.TransactionContext.SyncRoot)
{
- this.session.TransactionContext.SyncRoot.ReleaseMutex();
- this.session.TransactionContext.DtcWaitHandle.WaitOne();
+ // In the case where the consumer is operating in concert with a
+ // distributed TX manager we need to wait whenever the TX is being
+ // controlled by the DTC as it completes all operations async and
+ // we cannot start consumption again until all its tasks have completed.)
+ waitForDtcWaitHandle = this.session.TransactionContext.InNetTransaction &&
+ this.session.TransactionContext.NetTxState ==
+ TransactionContext.TxState.Pending;
}
- else
+
+ if (waitForDtcWaitHandle)
{
- this.session.TransactionContext.SyncRoot.ReleaseMutex();
+ this.session.TransactionContext.DtcWaitHandle.WaitOne();
}
}
@@ -1623,6 +1623,11 @@
return this.info.Destination.Equals(dest);
}
+ internal bool Closed
+ {
+ get { return this.unconsumedMessages.Closed; }
+ }
+
private void DoOptimizedAck(object state)
{
if (this.optimizeAcknowledge && !this.unconsumedMessages.Closed)
diff --git a/src/main/csharp/NetTxSession.cs b/src/main/csharp/NetTxSession.cs
index d977a03..59a8292 100644
--- a/src/main/csharp/NetTxSession.cs
+++ b/src/main/csharp/NetTxSession.cs
@@ -75,18 +75,17 @@
{
if (TransactionContext.InNetTransaction)
{
- TransactionContext.SyncRoot.WaitOne();
-
- if (TransactionContext.InNetTransaction)
+ lock (TransactionContext.SyncRoot)
{
- // Must wait for all the DTC operations to complete before
- // moving on from this close call.
- TransactionContext.SyncRoot.ReleaseMutex();
- this.TransactionContext.DtcWaitHandle.WaitOne();
- TransactionContext.SyncRoot.WaitOne();
+ if (TransactionContext.InNetTransaction)
+ {
+ // Must wait for all the DTC operations to complete before
+ // moving on from this close call.
+ Monitor.Exit(TransactionContext.SyncRoot);
+ this.TransactionContext.DtcWaitHandle.WaitOne();
+ Monitor.Enter(TransactionContext.SyncRoot);
+ }
}
-
- TransactionContext.SyncRoot.ReleaseMutex();
}
base.Close();
@@ -111,24 +110,23 @@
internal override void DoStartTransaction()
{
- TransactionContext.SyncRoot.WaitOne();
-
- if (TransactionContext.InNetTransaction && TransactionContext.NetTxState == TransactionContext.TxState.Pending)
+ lock (TransactionContext.SyncRoot)
{
- // To late to participate in this TX, we have to wait for it to complete then
- // we can create a new TX and start from there.
- TransactionContext.SyncRoot.ReleaseMutex();
- TransactionContext.DtcWaitHandle.WaitOne();
- TransactionContext.SyncRoot.WaitOne();
+ if (TransactionContext.InNetTransaction && TransactionContext.NetTxState == TransactionContext.TxState.Pending)
+ {
+ // To late to participate in this TX, we have to wait for it to complete then
+ // we can create a new TX and start from there.
+ Monitor.Exit(TransactionContext.SyncRoot);
+ TransactionContext.DtcWaitHandle.WaitOne();
+ Monitor.Enter(TransactionContext.SyncRoot);
+ }
+
+ if (!TransactionContext.InNetTransaction && Transaction.Current != null)
+ {
+ Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX with broker");
+ EnrollInSpecifiedTransaction(Transaction.Current);
+ }
}
-
- if (!TransactionContext.InNetTransaction && Transaction.Current != null)
- {
- Tracer.Debug("NetTxSession detected Ambient Transaction, start new TX with broker");
- EnrollInSpecifiedTransaction(Transaction.Current);
- }
-
- TransactionContext.SyncRoot.ReleaseMutex();
}
private void EnrollInSpecifiedTransaction(Transaction tx)
diff --git a/src/main/csharp/TransactionContext.cs b/src/main/csharp/TransactionContext.cs
index 2639610..d7ec8da 100644
--- a/src/main/csharp/TransactionContext.cs
+++ b/src/main/csharp/TransactionContext.cs
@@ -229,7 +229,7 @@
// Once the DTC calls prepare we lock this object and don't unlock it again until
// the TX has either completed or terminated, the users of this class should use
// this sync point when the TX is a DTC version as opposed to a local one.
- private readonly Mutex syncObject = new Mutex();
+ private readonly object syncObject = new Mutex();
public enum TxState
{
@@ -238,7 +238,7 @@
private TxState netTxState = TxState.None;
- public Mutex SyncRoot
+ public object SyncRoot
{
get { return this.syncObject; }
}