AMQNET-624: Fix failover issue when broker sends open frame and shortly after close frame
diff --git a/.travis.yml b/.travis.yml
index 7767232..1c04b32 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -4,4 +4,4 @@
dotnet: 2.2.401
script:
- dotnet build -p:AppTargetFramework=netcoreapp2.2 -c Release
- - dotnet test ./test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj -f netcoreapp2.2 -c Release
\ No newline at end of file
+ - dotnet test ./test/Apache-NMS-AMQP-Test/Apache-NMS-AMQP-Test.csproj -f netcoreapp2.2 -c Release --filter Category!=Windows
\ No newline at end of file
diff --git a/apache-nms-amqp.sln.DotSettings b/apache-nms-amqp.sln.DotSettings
index 69a5afc..8e25730 100644
--- a/apache-nms-amqp.sln.DotSettings
+++ b/apache-nms-amqp.sln.DotSettings
@@ -1,2 +1,3 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
- <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=JMS/@EntryIndexedValue">JMS</s:String></wpf:ResourceDictionary>
\ No newline at end of file
+ <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=JMS/@EntryIndexedValue">JMS</s:String>
+ <s:Boolean x:Key="/Default/UserDictionary/Words/=Amqp/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index 3410eb6..bcdd5a3 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -71,10 +71,10 @@
Address address = UriUtil.ToAddress(remoteUri, Info.username, Info.password);
this.tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
underlyingConnection = await transport.CreateAsync(address, new AmqpHandler(this)).ConfigureAwait(false);
- underlyingConnection.AddClosedCallback((sender, error) => Provider.OnConnectionClosed(error));
-
+ underlyingConnection.AddClosedCallback(OnClosed);
+
// Wait for connection to be opened
- await tsc.Task.ConfigureAwait(false);
+ await this.tsc.Task.ConfigureAwait(false);
// Create a Session for this connection that is used for Temporary Destinations
// and perhaps later on management and advisory monitoring.
@@ -86,6 +86,24 @@
await connectionSession.Start().ConfigureAwait(false);
}
+ private void OnClosed(IAmqpObject sender, Error error)
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug($"Connection closed. {error}");
+ }
+
+ bool connectionExplicitlyClosed = error == null;
+ if (!connectionExplicitlyClosed)
+ {
+ var exception = ExceptionSupport.GetException(error);
+ if (!this.tsc.TrySetException(exception))
+ {
+ Provider.OnConnectionClosed(exception);
+ }
+ }
+ }
+
internal void OnLocalOpen(Open open)
{
open.ContainerId = Info.ClientId;
@@ -122,7 +140,7 @@
Info.QueuePrefix = queuePrefix;
}
- this.tsc.SetResult(true);
+ this.tsc.TrySetResult(true);
Provider.FireConnectionEstablished();
}
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
index 99330db..80e2fdd 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
@@ -116,18 +116,9 @@
return connection.Start();
}
- internal void OnConnectionClosed(Error error)
+ internal void OnConnectionClosed(NMSException exception)
{
- if (Tracer.IsDebugEnabled)
- {
- Tracer.Debug($"Connection closed. {error}");
- }
-
- bool connectionExplicitlyClosed = error == null;
- if (!connectionExplicitlyClosed)
- {
- Listener?.OnConnectionFailure(ExceptionSupport.GetException(error));
- }
+ Listener?.OnConnectionFailure(exception);
}
internal void FireConnectionEstablished()
diff --git a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
index 923ab83..18c1dee 100644
--- a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
+++ b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
@@ -35,13 +35,13 @@
public static int DEFAULT_INITIAL_RECONNECT_DELAY = 0;
public static long DEFAULT_RECONNECT_DELAY = 10;
public static double DEFAULT_RECONNECT_BACKOFF_MULTIPLIER = 2.0d;
- public static long DEFAULT_MAX_RECONNECT_DELAY = (long)Math.Round(TimeSpan.FromSeconds(30).TotalMilliseconds);
+ public static long DEFAULT_MAX_RECONNECT_DELAY = (long) Math.Round(TimeSpan.FromSeconds(30).TotalMilliseconds);
public static int DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
public static int DEFAULT_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
public static bool DEFAULT_USE_RECONNECT_BACKOFF = true;
public static int DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS = 10;
public static bool DEFAULT_RANDOMIZE_ENABLED = false;
-
+
private readonly ReconnectControls reconnectControl;
private readonly FailoverUriPool uris;
@@ -140,7 +140,9 @@
{
provider?.Close();
}
- catch { }
+ catch
+ {
+ }
finally
{
provider = null;
@@ -164,7 +166,7 @@
{
if (provider == null)
{
- Tracer.Debug($"Connection attempt:[{reconnectControl.ReconnectAttempts}] failed error: {failure.Message}");
+ Tracer.Debug($"Connection attempt:[{reconnectControl.ReconnectAttempts}] failed error: {failure?.Message}");
if (!reconnectControl.IsReconnectAllowed(failure))
{
ReportReconnectFailure(failure);
@@ -337,7 +339,7 @@
public Task Recover(Id sessionId)
{
CheckClosed();
-
+
FailoverRequest request = new FailoverRequest(this, requestTimeout)
{
DoTask = activeProvider => activeProvider.Recover(sessionId),
@@ -404,7 +406,7 @@
public Task Unsubscribe(string name)
{
CheckClosed();
-
+
FailoverRequest request = new FailoverRequest(this, SendTimeout)
{
DoTask = activeProvider => activeProvider.Unsubscribe(name),
@@ -524,8 +526,8 @@
{
failoverRequest.ScheduleTimeout();
}
-
- TriggerReconnectionAttempt();
+
+ Task.Run(TriggerReconnectionAttempt);
listener?.OnConnectionInterrupted(failedUri);
}
@@ -646,7 +648,7 @@
if (failoverProvider.UseReconnectBackOff && ReconnectAttempts > 1)
{
// Exponential increment of reconnect delay.
- nextReconnectDelay = (long)Math.Round(nextReconnectDelay * failoverProvider.ReconnectBackOffMultiplier);
+ nextReconnectDelay = (long) Math.Round(nextReconnectDelay * failoverProvider.ReconnectBackOffMultiplier);
if (nextReconnectDelay > failoverProvider.MaxReconnectDelay)
{
nextReconnectDelay = failoverProvider.MaxReconnectDelay;
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index cb3f4cc..a3b6306 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -36,7 +36,7 @@
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
- [Test, Timeout(20_000), Ignore("Ignore as we cannot detect connection disconnect on Linux.")]
+ [Test, Timeout(20_000), Category("Windows")]
public void TestFailoverHandlesDropThenRejectionCloseAfterConnect()
{
using (TestAmqpPeer originalPeer = new TestAmqpPeer())
@@ -62,7 +62,6 @@
long ird = 0;
long rd = 2000;
- DateTime start = DateTime.UtcNow;
NmsConnection connection = EstablishAnonymousConnection("failover.initialReconnectDelay=" + ird + "&failover.reconnectDelay=" + rd + "&failover.maxReconnectAttempts=10", originalPeer,
rejectingPeer, finalPeer);
@@ -96,15 +95,7 @@
rejectingPeer.WaitForAllMatchersToComplete(2000);
- Assert.True(finalConnected.WaitOne(TimeSpan.FromSeconds(5)), "Should connect to final peer");
- DateTime end = DateTime.UtcNow;
-
- long margin = 2000;
-
- // TODO: It is failing because, we are not handling rejected connection properly, when socket connection is established
- // but broker replies with amqp:connection-establishment-failed. Initially connection is treated as successful, which resets
- // the attempts counter. As a result next connect attempt is being made without any delay.
- // Assert.That((end - start).TotalMilliseconds, Is.GreaterThanOrEqualTo(ird + rd).And.LessThanOrEqualTo(ird + rd + margin), "Elapsed time outwith expected range for reconnect");
+ Assert.True(finalConnected.WaitOne(TimeSpan.FromSeconds(10)), "Should connect to final peer");
finalPeer.ExpectClose();
connection.Close();
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index f1981e5..71f7f02 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -156,7 +156,7 @@
public void ExpectOpen(Fields serverProperties = null)
{
- ExpectOpen(desiredCapabilities: DEFAULT_DESIRED_CAPABILITIES, serverCapabilities: new[] { SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER }, serverProperties: null);
+ ExpectOpen(desiredCapabilities: DEFAULT_DESIRED_CAPABILITIES, serverCapabilities: new[] { SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER }, serverProperties: serverProperties);
}
public void ExpectOpen(Symbol[] serverCapabilities, Fields serverProperties)
@@ -190,7 +190,7 @@
public void RejectConnect(Symbol errorType, string errorMessage)
{
// Expect a connection, establish through the SASL negotiation and sending of the Open frame
- Fields serverProperties = new Fields { { SymbolUtil.CONNECTION_ESTABLISH_FAILED, true } };
+ Fields serverProperties = new Fields { { SymbolUtil.CONNECTION_ESTABLISH_FAILED, SymbolUtil.BOOLEAN_TRUE } };
ExpectSaslAnonymous();
ExpectOpen(serverProperties: serverProperties);
@@ -198,12 +198,10 @@
IMatcher lastMatcher = GetLastMatcher();
lastMatcher.WithOnComplete(context =>
{
- var close = new Close { Error = new Error(errorType) { Description = errorMessage } };
+ var close = new Close { Error = new Error(errorType) { Description = errorMessage} };
context.SendCommand(CONNECTION_CHANNEL, close);
});
- AddMatcher(new FrameMatcher<Begin>());
-
var closeMatcher = new FrameMatcher<Close>()
.WithAssertion(close => Assert.IsNull(close.Error));