https://issues.apache.org/jira/browse/AMQNET-449
Race on connection interruption processing and consumer close.
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index a0c3699..c331381 100755
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -90,7 +90,7 @@
private ICompressionPolicy compressionPolicy = new CompressionPolicy();
private readonly IdGenerator clientIdGenerator;
private int consumerIdCounter = 0;
- private volatile CountDownLatch transportInterruptionProcessingComplete;
+ private long transportInterruptionProcessingComplete;
private readonly MessageTransformation messageTransformation;
private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
private AdvisoryConsumer advisoryConsumer = null;
@@ -1257,21 +1257,18 @@
// Ensure that if there's an advisory consumer we don't add it to the
// set of consumers that need interruption processing.
- this.transportInterruptionProcessingComplete =
- new CountDownLatch(dispatchers.Count - (this.advisoryConsumer != null ? 1 : 0));
+ Interlocked.Exchange(ref transportInterruptionProcessingComplete, 1);
if(Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Connection[{0}]: Transport interrupted, dispatchers: {1}", this.ConnectionId, dispatchers.Count);
}
- SignalInterruptionProcessingNeeded();
-
foreach(Session session in this.sessions)
{
try
{
- session.ClearMessagesInProgress();
+ session.ClearMessagesInProgress(ref transportInterruptionProcessingComplete);
}
catch(Exception ex)
{
@@ -1280,6 +1277,14 @@
}
}
+ if (Interlocked.Decrement(ref transportInterruptionProcessingComplete) > 0)
+ {
+ Tracer.DebugFormat("Transport interrupted - processing required, dispatchers: {0}",
+ Interlocked.Read(ref transportInterruptionProcessingComplete));
+
+ SignalInterruptionProcessingNeeded();
+ }
+
if(this.ConnectionInterruptedListener != null && !this.closing.Value)
{
try
@@ -1397,59 +1402,39 @@
private void WaitForTransportInterruptionProcessingToComplete()
{
- CountDownLatch cdl = this.transportInterruptionProcessingComplete;
- if(cdl != null)
+ if(!closed.Value && !transportFailed.Value && Interlocked.Read(ref transportInterruptionProcessingComplete) > 0)
{
- if(!closed.Value && cdl.Remaining > 0)
- {
- Tracer.WarnFormat("Connection[{0}]: Dispatch paused, waiting for outstanding dispatch interruption " +
- "processing ({1}) to complete..", this.ConnectionId, cdl.Remaining);
- cdl.await(TimeSpan.FromSeconds(10));
- }
-
+ Tracer.WarnFormat("Connection[{0}]: Dispatch with outstanding dispatch interruption processing count: {1}",
+ this.ConnectionId, Interlocked.Read(ref transportInterruptionProcessingComplete));
SignalInterruptionProcessingComplete();
}
}
internal void TransportInterruptionProcessingComplete()
{
- CountDownLatch cdl = this.transportInterruptionProcessingComplete;
- if(cdl != null)
+ if (Interlocked.Decrement(ref transportInterruptionProcessingComplete) == 0)
{
- cdl.countDown();
- try
- {
- SignalInterruptionProcessingComplete();
- }
- catch
- {
- }
+ SignalInterruptionProcessingComplete();
}
}
private void SignalInterruptionProcessingComplete()
{
- CountDownLatch cdl = this.transportInterruptionProcessingComplete;
- if(cdl.Remaining == 0)
+ Tracer.DebugFormat("Connection[{0}]: signalled TransportInterruptionProcessingComplete: {1}",
+ this.ConnectionId, Interlocked.Read(ref transportInterruptionProcessingComplete));
+
+ FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
+ if(failoverTransport != null)
{
+ failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId);
if(Tracer.IsDebugEnabled)
{
- Tracer.DebugFormat("Connection[{0}]: transportInterruptionProcessingComplete.", this.info.ConnectionId);
- }
-
- this.transportInterruptionProcessingComplete = null;
-
- FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
- if(failoverTransport != null)
- {
- failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId);
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Connection[{0}]: notified failover transport ({1})" +
- " of interruption completion.", this.ConnectionId, failoverTransport);
- }
+ Tracer.DebugFormat("Connection[{0}]: notified failover transport ({1})" +
+ " of interruption completion.", this.ConnectionId, failoverTransport);
}
}
+
+ Interlocked.Exchange(ref transportInterruptionProcessingComplete, 0);
}
private void SignalInterruptionProcessingNeeded()
diff --git a/src/main/csharp/Session.cs b/src/main/csharp/Session.cs
index f7c95d1..f8289ae 100755
--- a/src/main/csharp/Session.cs
+++ b/src/main/csharp/Session.cs
@@ -922,7 +922,7 @@
}
}
- internal void ClearMessagesInProgress()
+ internal void ClearMessagesInProgress(ref long transportInterruptionProcessingComplete)
{
if(this.executor != null)
{
@@ -940,6 +940,7 @@
foreach(MessageConsumer consumer in this.consumers.Values)
{
consumer.InProgressClearRequired();
+ Interlocked.Increment(ref transportInterruptionProcessingComplete);
Scheduler.ExecuteAfterDelay(ClearMessages, consumer, 0);
}
}
diff --git a/src/main/csharp/State/ConnectionStateTracker.cs b/src/main/csharp/State/ConnectionStateTracker.cs
index b61deaa..65814ea 100644
--- a/src/main/csharp/State/ConnectionStateTracker.cs
+++ b/src/main/csharp/State/ConnectionStateTracker.cs
@@ -450,6 +450,8 @@
{
ss.RemoveConsumer(id);
}
+
+ cs.RecoveringPullConsumers.Remove(id);
}
}
}