https://issues.apache.org/jira/browse/AMQNET-449

Race on connection interruption processing and consumer close.
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index a0c3699..c331381 100755
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -90,7 +90,7 @@
 		private ICompressionPolicy compressionPolicy = new CompressionPolicy();
 		private readonly IdGenerator clientIdGenerator;
 		private int consumerIdCounter = 0;
-		private volatile CountDownLatch transportInterruptionProcessingComplete;
+		private long transportInterruptionProcessingComplete;
 		private readonly MessageTransformation messageTransformation;
 		private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
 		private AdvisoryConsumer advisoryConsumer = null;
@@ -1257,21 +1257,18 @@
 
 			// Ensure that if there's an advisory consumer we don't add it to the
 			// set of consumers that need interruption processing.
-			this.transportInterruptionProcessingComplete =
-				new CountDownLatch(dispatchers.Count - (this.advisoryConsumer != null ? 1 : 0));
+			Interlocked.Exchange(ref transportInterruptionProcessingComplete, 1);
 
 			if(Tracer.IsDebugEnabled)
 			{
 				Tracer.DebugFormat("Connection[{0}]: Transport interrupted, dispatchers: {1}", this.ConnectionId, dispatchers.Count);
 			}
 
-			SignalInterruptionProcessingNeeded();
-
 			foreach(Session session in this.sessions)
 			{
 				try
 				{
-					session.ClearMessagesInProgress();
+					session.ClearMessagesInProgress(ref transportInterruptionProcessingComplete);
 				}
 				catch(Exception ex)
 				{
@@ -1280,6 +1277,14 @@
 				}
 			}
 
+			if (Interlocked.Decrement(ref transportInterruptionProcessingComplete) > 0)
+			{
+				Tracer.DebugFormat("Transport interrupted - processing required, dispatchers: {0}",
+				                   Interlocked.Read(ref transportInterruptionProcessingComplete));
+
+				SignalInterruptionProcessingNeeded();
+			}
+
 			if(this.ConnectionInterruptedListener != null && !this.closing.Value)
 			{
 				try
@@ -1397,59 +1402,39 @@
 
 		private void WaitForTransportInterruptionProcessingToComplete()
 		{
-			CountDownLatch cdl = this.transportInterruptionProcessingComplete;
-			if(cdl != null)
+			if(!closed.Value && !transportFailed.Value && Interlocked.Read(ref transportInterruptionProcessingComplete) > 0)
 			{
-				if(!closed.Value && cdl.Remaining > 0)
-				{
-					Tracer.WarnFormat("Connection[{0}]: Dispatch paused, waiting for outstanding dispatch interruption " +
-									  "processing ({1}) to complete..", this.ConnectionId, cdl.Remaining);
-					cdl.await(TimeSpan.FromSeconds(10));
-				}
-
+				Tracer.WarnFormat("Connection[{0}]: Dispatch with outstanding dispatch interruption processing count: {1}",
+				                  this.ConnectionId, Interlocked.Read(ref transportInterruptionProcessingComplete));
 				SignalInterruptionProcessingComplete();
 			}
 		}
 
 		internal void TransportInterruptionProcessingComplete()
 		{
-			CountDownLatch cdl = this.transportInterruptionProcessingComplete;
-			if(cdl != null)
+			if (Interlocked.Decrement(ref transportInterruptionProcessingComplete) == 0)
 			{
-				cdl.countDown();
-				try
-				{
-					SignalInterruptionProcessingComplete();
-				}
-				catch
-				{
-				}
+				SignalInterruptionProcessingComplete();
 			}
 		}
 
 		private void SignalInterruptionProcessingComplete()
 		{
-			CountDownLatch cdl = this.transportInterruptionProcessingComplete;
-			if(cdl.Remaining == 0)
+			Tracer.DebugFormat("Connection[{0}]: signalled TransportInterruptionProcessingComplete: {1}",
+			                   this.ConnectionId, Interlocked.Read(ref transportInterruptionProcessingComplete));
+
+			FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
+			if(failoverTransport != null)
 			{
+				failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId);
 				if(Tracer.IsDebugEnabled)
 				{
-					Tracer.DebugFormat("Connection[{0}]: transportInterruptionProcessingComplete.", this.info.ConnectionId);
-				}
-
-				this.transportInterruptionProcessingComplete = null;
-
-				FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
-				if(failoverTransport != null)
-				{
-					failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId);
-					if(Tracer.IsDebugEnabled)
-					{
-						Tracer.DebugFormat("Connection[{0}]: notified failover transport ({1})" +
-										   " of interruption completion.", this.ConnectionId, failoverTransport);
-					}
+					Tracer.DebugFormat("Connection[{0}]: notified failover transport ({1})" +
+									   " of interruption completion.", this.ConnectionId, failoverTransport);
 				}
 			}
+
+			Interlocked.Exchange(ref transportInterruptionProcessingComplete, 0);
 		}
 
 		private void SignalInterruptionProcessingNeeded()
diff --git a/src/main/csharp/Session.cs b/src/main/csharp/Session.cs
index f7c95d1..f8289ae 100755
--- a/src/main/csharp/Session.cs
+++ b/src/main/csharp/Session.cs
@@ -922,7 +922,7 @@
             }
         }
 
-        internal void ClearMessagesInProgress()
+        internal void ClearMessagesInProgress(ref long transportInterruptionProcessingComplete)
         {
             if(this.executor != null)
             {
@@ -940,6 +940,7 @@
                 foreach(MessageConsumer consumer in this.consumers.Values)
                 {
                     consumer.InProgressClearRequired();
+					Interlocked.Increment(ref transportInterruptionProcessingComplete);
 					Scheduler.ExecuteAfterDelay(ClearMessages, consumer, 0);
                 }
             }
diff --git a/src/main/csharp/State/ConnectionStateTracker.cs b/src/main/csharp/State/ConnectionStateTracker.cs
index b61deaa..65814ea 100644
--- a/src/main/csharp/State/ConnectionStateTracker.cs
+++ b/src/main/csharp/State/ConnectionStateTracker.cs
@@ -450,6 +450,8 @@
                             {
                                 ss.RemoveConsumer(id);
                             }
+
+							cs.RecoveringPullConsumers.Remove(id);
                         }
                     }
                 }