AMQNET-610: Race condition during consumer creation
diff --git a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs b/src/NMS.AMQP/NmsDurableTopicSubscriber.cs
new file mode 100644
index 0000000..f39d7ea
--- /dev/null
+++ b/src/NMS.AMQP/NmsDurableTopicSubscriber.cs
@@ -0,0 +1,17 @@
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP
+{
+    public class NmsDurableTopicSubscriber : NmsMessageConsumer
+    {
+        public NmsDurableTopicSubscriber(Id consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
+        {
+        }
+
+        public NmsDurableTopicSubscriber(Id consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
+        {
+        }
+
+        protected override bool IsDurableSubscription => true;
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index 13a4cf0..34afd0c 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -40,7 +40,7 @@
         {
         }
 
-        public NmsMessageConsumer(Id consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal)
+        protected NmsMessageConsumer(Id consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal)
         {
             Session = session;
             acknowledgementMode = session.AcknowledgementMode;
@@ -56,9 +56,14 @@
                 Selector = selector,
                 NoLocal = noLocal,
                 SubscriptionName = name,
-                LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry
+                LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry,
+                IsDurable = IsDurableSubscription
             };
             deliveryTask = new MessageDeliveryTask(this);
+            
+            Session.Connection.CreateResource(Info).ConfigureAwait(false).GetAwaiter().GetResult();
+            
+            Session.Add(this);
 
             if (Session.IsStarted)
                 Start();
@@ -68,6 +73,8 @@
         public ConsumerInfo Info { get; }
         public IDestination Destination => Info.Destination;
 
+        protected virtual bool IsDurableSubscription => false;
+
         public void Dispose()
         {
             try
@@ -186,10 +193,9 @@
 
         private event MessageListener Listener;
 
-        public async Task Init()
+        public Task Init()
         {
-            await Session.Connection.CreateResource(Info);
-            await Session.Connection.StartResource(Info);
+            return Session.Connection.StartResource(Info);
         }
 
         public void OnInboundMessage(InboundMessageDispatch envelope)
@@ -417,10 +423,10 @@
         {
             if (closed.CompareAndSet(false, true))
             {
-                messageQueue.Dispose();
                 failureCause = exception;
                 Session.Remove(this);
                 started.Set(false);
+                messageQueue.Dispose();
             }
         }
 
@@ -500,9 +506,9 @@
         {
             try
             {
-                Session.Connection.StartResource(Info);
+                Session.Connection.StartResource(Info).ConfigureAwait(false).GetAwaiter().GetResult();
             }
-            catch (NMSException ex)
+            catch (NMSException)
             {
                 Session.Remove(this);
                 throw;
diff --git a/src/NMS.AMQP/NmsMessageProducer.cs b/src/NMS.AMQP/NmsMessageProducer.cs
index feca455..c5da8e2 100644
--- a/src/NMS.AMQP/NmsMessageProducer.cs
+++ b/src/NMS.AMQP/NmsMessageProducer.cs
@@ -43,6 +43,10 @@
             {
                 Destination = destination
             };
+            
+            session.Connection.CreateResource(Info).ConfigureAwait(false).GetAwaiter().GetResult();
+            
+            session.Add(this);
         }
 
         public ProducerInfo Info { get; }
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index 5843606..c9f008d 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -20,6 +20,7 @@
 using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
+using Amqp;
 using Apache.NMS.AMQP.Message;
 using Apache.NMS.AMQP.Meta;
 using Apache.NMS.AMQP.Provider;
@@ -99,10 +100,7 @@
 
         public IMessageProducer CreateProducer(IDestination destination)
         {
-            NmsMessageProducer messageProducer = new NmsMessageProducer(producerIdGenerator.GenerateId(), this, destination);
-            messageProducer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
-            producers.TryAdd(messageProducer.Info.Id, messageProducer);
-            return messageProducer;
+            return new NmsMessageProducer(producerIdGenerator.GenerateId(), this, destination);
         }
 
         public IMessageConsumer CreateConsumer(IDestination destination)
@@ -121,8 +119,6 @@
 
             NmsMessageConsumer messageConsumer = new NmsMessageConsumer(consumerIdGenerator.GenerateId(), this, destination, selector, noLocal);
             messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
-
-            consumers.TryAdd(messageConsumer.Info.Id, messageConsumer);
             
             return messageConsumer;
         }
@@ -131,10 +127,8 @@
         {
             CheckClosed();
 
-            NmsMessageConsumer messageConsumer = new NmsMessageConsumer(consumerIdGenerator.GenerateId(), this, destination, name, selector, noLocal);
-            messageConsumer.Info.IsDurable = true;
+            NmsMessageConsumer messageConsumer = new NmsDurableTopicSubscriber(consumerIdGenerator.GenerateId(), this, destination, name, selector, noLocal);
             messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
-            consumers.TryAdd(messageConsumer.Info.Id, messageConsumer);
 
             return messageConsumer;
         }
@@ -474,14 +468,24 @@
             return producer;
         }
 
+        public void Add(NmsMessageConsumer messageConsumer)
+        {
+            consumers.TryAdd(messageConsumer.Info.Id, messageConsumer);
+        }
+        
+        public void Add(NmsMessageProducer messageProducer)
+        {
+            producers.TryAdd(messageProducer.Info.Id, messageProducer);
+        }
+
         public void Remove(NmsMessageConsumer messageConsumer)
         {
-            consumers.TryRemove(messageConsumer.Info.Id, out messageConsumer);
+            consumers.TryRemove(messageConsumer.Info.Id, out _);
         }
 
         public void Remove(NmsMessageProducer messageProducer)
         {
-            producers.TryRemove(messageProducer.Info.Id, out messageProducer);
+            producers.TryRemove(messageProducer.Info.Id, out _);
         }
 
         public void Shutdown(NMSException exception = null)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index 03ca15e..3410eb6 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -74,7 +74,7 @@
             underlyingConnection.AddClosedCallback((sender, error) => Provider.OnConnectionClosed(error));
             
             // Wait for connection to be opened
-            await tsc.Task;
+            await tsc.Task.ConfigureAwait(false);
 
             // Create a Session for this connection that is used for Temporary Destinations
             // and perhaps later on management and advisory monitoring.
@@ -130,7 +130,7 @@
         public async Task CreateSession(SessionInfo sessionInfo)
         {
             var amqpSession = new AmqpSession(this, sessionInfo);
-            await amqpSession.Start();
+            await amqpSession.Start().ConfigureAwait(false);
             sessions.TryAdd(sessionInfo.Id, amqpSession);
         }
 
@@ -154,8 +154,7 @@
             {
                 return session;
             }
-
-            throw new Exception();
+            throw new InvalidOperationException($"Amqp Session {sessionId} doesn't exist and cannot be retrieved.");
         }
 
         public void RemoveSession(Id sessionId)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs b/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
index d20ce76..ee81348 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
@@ -69,7 +69,6 @@
                     SessionInfo.remoteChannel = begin.RemoteChannel;
                     tcs.TrySetResult(true);
                 });
-            UnderlyingSession.AddClosedCallback((sender, error) => { tcs.TrySetException(ExceptionSupport.GetException(error)); });
             UnderlyingSession.AddClosedCallback((sender, error) =>
             {
                 if (!tcs.TrySetException(ExceptionSupport.GetException(error)))
diff --git a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
index b338555..923ab83 100644
--- a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
+++ b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
@@ -128,8 +128,8 @@
                             {
                                 Tracer.Debug($"Connection attempt:[{reconnectAttempts}] to: {target.Scheme}://{target.Host}:{target.Port} in-progress");
                                 provider = ProviderFactory.Create(target);
-                                await provider.Connect(connectionInfo);
-                                await InitializeNewConnection(provider);
+                                await provider.Connect(connectionInfo).ConfigureAwait(false);
+                                await InitializeNewConnection(provider).ConfigureAwait(false);
                                 return;
                             }
                             catch (Exception e)
@@ -171,7 +171,7 @@
                         }
                         else
                         {
-                            await reconnectControl.ScheduleReconnect(Reconnect);
+                            await reconnectControl.ScheduleReconnect(Reconnect).ConfigureAwait(false);
                         }
                     }
                 }
@@ -218,21 +218,21 @@
                 Tracer.Debug($"Signalling connection recovery: {provider}");
 
                 // Allow listener to recover its resources
-                await listener.OnConnectionRecovery(provider);
+                await listener.OnConnectionRecovery(provider).ConfigureAwait(false);
 
                 // Restart consumers, send pull commands, etc.
-                await listener.OnConnectionRecovered(provider);
+                await listener.OnConnectionRecovered(provider).ConfigureAwait(false);
 
                 // Let the client know that connection has restored.
                 listener.OnConnectionRestored(connectedUri);
 
                 // If we try to run pending requests right after the connection is reestablished 
                 // it will result in timeout on the first send request
-                await Task.Delay(50);
+                await Task.Delay(50).ConfigureAwait(false);
 
                 foreach (FailoverRequest request in GetPendingRequests())
                 {
-                    await request.Run();
+                    await request.Run().ConfigureAwait(false);
                 }
 
                 reconnectControl.ConnectionEstablished();
diff --git a/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs b/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs
index b54a20f..b6a5684 100644
--- a/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs
+++ b/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs
@@ -61,7 +61,7 @@
             {
                 try
                 {
-                    await this.DoTask(activeProvider);
+                    await this.DoTask(activeProvider).ConfigureAwait(false);
                     this.taskCompletionSource.TrySetResult(true);
                     this.failoverProvider.RemoveFailoverRequest(this);
                     this.cancellationTokenSource?.Dispose();