AMQNET-624: Fix failover issue when broker sends open frame and shortly after close frame
diff --git a/.travis.yml b/.travis.yml
index 7767232..1c04b32 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -4,4 +4,4 @@
 dotnet: 2.2.401
 script:
  - dotnet build -p:AppTargetFramework=netcoreapp2.2 -c Release
- - dotnet test ./test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj -f netcoreapp2.2 -c Release
\ No newline at end of file
+ - dotnet test ./test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj -f netcoreapp2.2 -c Release --filter Category!=Windows
\ No newline at end of file
diff --git a/apache-nms-amqp.sln.DotSettings b/apache-nms-amqp.sln.DotSettings
index 69a5afc..8e25730 100644
--- a/apache-nms-amqp.sln.DotSettings
+++ b/apache-nms-amqp.sln.DotSettings
@@ -1,2 +1,3 @@
 <wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
-	<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=JMS/@EntryIndexedValue">JMS</s:String></wpf:ResourceDictionary>
\ No newline at end of file
+	<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=JMS/@EntryIndexedValue">JMS</s:String>
+	<s:Boolean x:Key="/Default/UserDictionary/Words/=Amqp/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index 3410eb6..bcdd5a3 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -71,10 +71,10 @@
             Address address = UriUtil.ToAddress(remoteUri, Info.username, Info.password);
             this.tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
             underlyingConnection = await transport.CreateAsync(address, new AmqpHandler(this)).ConfigureAwait(false);
-            underlyingConnection.AddClosedCallback((sender, error) => Provider.OnConnectionClosed(error));
-            
+            underlyingConnection.AddClosedCallback(OnClosed);
+
             // Wait for connection to be opened
-            await tsc.Task.ConfigureAwait(false);
+            await this.tsc.Task.ConfigureAwait(false);
 
             // Create a Session for this connection that is used for Temporary Destinations
             // and perhaps later on management and advisory monitoring.
@@ -86,6 +86,24 @@
             await connectionSession.Start().ConfigureAwait(false);
         }
 
+        private void OnClosed(IAmqpObject sender, Error error)
+        {
+            if (Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug($"Connection closed. {error}");
+            }
+
+            bool connectionExplicitlyClosed = error == null;
+            if (!connectionExplicitlyClosed)
+            {
+                var exception = ExceptionSupport.GetException(error);
+                if (!this.tsc.TrySetException(exception))
+                {
+                    Provider.OnConnectionClosed(exception);
+                }
+            }
+        }
+
         internal void OnLocalOpen(Open open)
         {
             open.ContainerId = Info.ClientId;
@@ -122,7 +140,7 @@
                     Info.QueuePrefix = queuePrefix;
                 }
 
-                this.tsc.SetResult(true);
+                this.tsc.TrySetResult(true);
                 Provider.FireConnectionEstablished();
             }
         }
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
index 99330db..80e2fdd 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
@@ -116,18 +116,9 @@
             return connection.Start();
         }
 
-        internal void OnConnectionClosed(Error error)
+        internal void OnConnectionClosed(NMSException exception)
         {
-            if (Tracer.IsDebugEnabled)
-            {
-                Tracer.Debug($"Connection closed. {error}");
-            }
-
-            bool connectionExplicitlyClosed = error == null;
-            if (!connectionExplicitlyClosed)
-            {
-                Listener?.OnConnectionFailure(ExceptionSupport.GetException(error));
-            }
+            Listener?.OnConnectionFailure(exception);
         }
 
         internal void FireConnectionEstablished()
diff --git a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
index 923ab83..18c1dee 100644
--- a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
+++ b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
@@ -35,13 +35,13 @@
         public static int DEFAULT_INITIAL_RECONNECT_DELAY = 0;
         public static long DEFAULT_RECONNECT_DELAY = 10;
         public static double DEFAULT_RECONNECT_BACKOFF_MULTIPLIER = 2.0d;
-        public static long DEFAULT_MAX_RECONNECT_DELAY = (long)Math.Round(TimeSpan.FromSeconds(30).TotalMilliseconds);
+        public static long DEFAULT_MAX_RECONNECT_DELAY = (long) Math.Round(TimeSpan.FromSeconds(30).TotalMilliseconds);
         public static int DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
         public static int DEFAULT_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
         public static bool DEFAULT_USE_RECONNECT_BACKOFF = true;
         public static int DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS = 10;
         public static bool DEFAULT_RANDOMIZE_ENABLED = false;
-        
+
         private readonly ReconnectControls reconnectControl;
         private readonly FailoverUriPool uris;
 
@@ -140,7 +140,9 @@
                                 {
                                     provider?.Close();
                                 }
-                                catch { }
+                                catch
+                                {
+                                }
                                 finally
                                 {
                                     provider = null;
@@ -164,7 +166,7 @@
                 {
                     if (provider == null)
                     {
-                        Tracer.Debug($"Connection attempt:[{reconnectControl.ReconnectAttempts}] failed error: {failure.Message}");
+                        Tracer.Debug($"Connection attempt:[{reconnectControl.ReconnectAttempts}] failed error: {failure?.Message}");
                         if (!reconnectControl.IsReconnectAllowed(failure))
                         {
                             ReportReconnectFailure(failure);
@@ -337,7 +339,7 @@
         public Task Recover(Id sessionId)
         {
             CheckClosed();
-            
+
             FailoverRequest request = new FailoverRequest(this, requestTimeout)
             {
                 DoTask = activeProvider => activeProvider.Recover(sessionId),
@@ -404,7 +406,7 @@
         public Task Unsubscribe(string name)
         {
             CheckClosed();
-            
+
             FailoverRequest request = new FailoverRequest(this, SendTimeout)
             {
                 DoTask = activeProvider => activeProvider.Unsubscribe(name),
@@ -524,8 +526,8 @@
                         {
                             failoverRequest.ScheduleTimeout();
                         }
-                    
-                        TriggerReconnectionAttempt();
+
+                        Task.Run(TriggerReconnectionAttempt);
 
                         listener?.OnConnectionInterrupted(failedUri);
                     }
@@ -646,7 +648,7 @@
                 if (failoverProvider.UseReconnectBackOff && ReconnectAttempts > 1)
                 {
                     // Exponential increment of reconnect delay.
-                    nextReconnectDelay = (long)Math.Round(nextReconnectDelay * failoverProvider.ReconnectBackOffMultiplier);
+                    nextReconnectDelay = (long) Math.Round(nextReconnectDelay * failoverProvider.ReconnectBackOffMultiplier);
                     if (nextReconnectDelay > failoverProvider.MaxReconnectDelay)
                     {
                         nextReconnectDelay = failoverProvider.MaxReconnectDelay;
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index cb3f4cc..a3b6306 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -36,7 +36,7 @@
     {
         private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
         
-        [Test, Timeout(20_000), Ignore("Ignore as we cannot detect connection disconnect on Linux.")]
+        [Test, Timeout(20_000), Category("Windows")]
         public void TestFailoverHandlesDropThenRejectionCloseAfterConnect()
         {
             using (TestAmqpPeer originalPeer = new TestAmqpPeer())
@@ -62,7 +62,6 @@
 
                 long ird = 0;
                 long rd = 2000;
-                DateTime start = DateTime.UtcNow;
 
                 NmsConnection connection = EstablishAnonymousConnection("failover.initialReconnectDelay=" + ird + "&failover.reconnectDelay=" + rd + "&failover.maxReconnectAttempts=10", originalPeer,
                     rejectingPeer, finalPeer);
@@ -96,15 +95,7 @@
 
                 rejectingPeer.WaitForAllMatchersToComplete(2000);
 
-                Assert.True(finalConnected.WaitOne(TimeSpan.FromSeconds(5)), "Should connect to final peer");
-                DateTime end = DateTime.UtcNow;
-
-                long margin = 2000;
-
-                // TODO: It is failing because, we are not handling rejected connection properly, when socket connection is established
-                // but broker replies with amqp:connection-establishment-failed. Initially connection is treated as successful, which resets
-                // the attempts counter. As a result next connect attempt is being made without any delay.
-                // Assert.That((end - start).TotalMilliseconds, Is.GreaterThanOrEqualTo(ird + rd).And.LessThanOrEqualTo(ird + rd + margin), "Elapsed time outwith expected range for reconnect");
+                Assert.True(finalConnected.WaitOne(TimeSpan.FromSeconds(10)), "Should connect to final peer");
 
                 finalPeer.ExpectClose();
                 connection.Close();
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index f1981e5..71f7f02 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -156,7 +156,7 @@
 
         public void ExpectOpen(Fields serverProperties = null)
         {
-            ExpectOpen(desiredCapabilities: DEFAULT_DESIRED_CAPABILITIES, serverCapabilities: new[] { SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER }, serverProperties: null);
+            ExpectOpen(desiredCapabilities: DEFAULT_DESIRED_CAPABILITIES, serverCapabilities: new[] { SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER }, serverProperties: serverProperties);
         }
 
         public void ExpectOpen(Symbol[] serverCapabilities, Fields serverProperties)
@@ -190,7 +190,7 @@
         public void RejectConnect(Symbol errorType, string errorMessage)
         {
             // Expect a connection, establish through the SASL negotiation and sending of the Open frame
-            Fields serverProperties = new Fields { { SymbolUtil.CONNECTION_ESTABLISH_FAILED, true } };
+            Fields serverProperties = new Fields { { SymbolUtil.CONNECTION_ESTABLISH_FAILED, SymbolUtil.BOOLEAN_TRUE } };
             ExpectSaslAnonymous();
             ExpectOpen(serverProperties: serverProperties);
 
@@ -198,12 +198,10 @@
             IMatcher lastMatcher = GetLastMatcher();
             lastMatcher.WithOnComplete(context =>
             {
-                var close = new Close { Error = new Error(errorType) { Description = errorMessage } };
+                var close = new Close { Error = new Error(errorType) { Description = errorMessage} };
                 context.SendCommand(CONNECTION_CHANNEL, close);
             });
 
-            AddMatcher(new FrameMatcher<Begin>());
-
             var closeMatcher = new FrameMatcher<Close>()
                 .WithAssertion(close => Assert.IsNull(close.Error));