AMQNET-722 Connection timing new options
diff --git a/docs/configuration.md b/docs/configuration.md
index 2b3046e..3375d45 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -23,6 +23,7 @@
- **nms.requestTimeout** Timeout value that controls how long the client waits on completion of various synchronous interactions, such as opening a producer or consumer, before returning an error. Does not affect synchronous message sends. By default the client will wait indefinitely for a request to complete.
- **nms.clientIdPrefix** Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory. The default prefix is 'ID:'.
- **nms.connectionIdPrefix** Optional prefix value that is used for generated Connection ID values when a new Connection is created for the JMS ConnectionFactory. This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing the logs easier. The default prefix is 'ID:'.
+- **nms.maxNewConnectionRatePerSec** Allowed approximated rate for how fast connection factory is allowed to create new connection. If there is more request, they will have to wait. Default value is -1 which means unlimited.
### TCP Transport Configuration options
When connected to a remote using plain TCP these options configure the behaviour of the underlying socket. These options are appended to the connection URI along with the other configuration options, for example:
diff --git a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
index f57c5d4..b656037 100644
--- a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
@@ -34,6 +34,7 @@
public static readonly int DEFAULT_IDLE_TIMEOUT;
public static readonly ushort DEFAULT_CHANNEL_MAX;
public static readonly int DEFAULT_MAX_FRAME_SIZE;
+ public static double DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC = -1;
static NmsConnectionInfo()
{
diff --git a/src/NMS.AMQP/NmsConnectionFactory.cs b/src/NMS.AMQP/NmsConnectionFactory.cs
index 66543df..6729bee 100644
--- a/src/NMS.AMQP/NmsConnectionFactory.cs
+++ b/src/NMS.AMQP/NmsConnectionFactory.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Specialized;
+using System.Threading;
using System.Threading.Tasks;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider;
@@ -36,6 +37,9 @@
private IdGenerator clientIdGenerator;
private IdGenerator connectionIdGenerator;
private readonly object syncRoot = new object();
+
+ DateTime nextAllowedConnectionCreationTime = DateTime.MinValue;
+
public NmsConnectionFactory(string userName, string password)
{
@@ -110,7 +114,7 @@
/// User name value used to authenticate the connection
/// </summary>
public string UserName { get; set; }
-
+
/// <summary>
/// The password value used to authenticate the connection
/// </summary>
@@ -121,7 +125,7 @@
/// before returning an error. Does not affect synchronous message sends. By default the client will wait indefinitely for a request to complete.
/// </summary>
public long RequestTimeout { get; set; } = NmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
-
+
/// <summary>
/// Timeout value that controls how long the client waits on completion of a synchronous message send before returning an error.
/// By default the client will wait indefinitely for a send to complete.
@@ -146,7 +150,7 @@
/// the logs easier. The default prefix is 'ID:'.
/// </summary>
public string ConnectionIdPrefix { get; set; }
-
+
/// <summary>
/// Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory.
/// The default prefix is 'ID:'.
@@ -164,7 +168,15 @@
/// </summary>
public string ClientId { get; set; }
- public IConnection CreateConnection()
+ /// <summary>
+ /// Sets the desired max rate of creating new connections by this factory.
+ ///
+ /// NOTE: During creating new connection if the rate is too high system will
+ /// try to suspend creation execution to force the desired max rate of new connection creation
+ /// </summary>
+ public double MaxNewConnectionRatePerSec { get; set; } = NmsConnectionInfo.DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC;
+
+ public IConnection CreateConnection()
{
return CreateConnection(UserName, Password);
}
@@ -174,15 +186,17 @@
return CreateConnectionAsync(UserName, Password);
}
- public Task<IConnection> CreateConnectionAsync(string userName, string password)
- {
- return Task.FromResult(CreateConnection(userName, password));
- }
-
public IConnection CreateConnection(string userName, string password)
{
+ return CreateConnectionAsync(userName, password).GetAsyncResult();
+ }
+
+ public async Task<IConnection> CreateConnectionAsync(string userName, string password)
+ {
try
{
+ await CheckMaxNewConnectionRate();
+
NmsConnectionInfo connectionInfo = ConfigureConnectionInfo(userName, password);
IProvider provider = ProviderFactory.Create(BrokerUri);
return new NmsConnection(connectionInfo, provider);
@@ -192,7 +206,38 @@
throw NMSExceptionSupport.Create(e);
}
}
+
+ private async Task CheckMaxNewConnectionRate()
+ {
+ if (MaxNewConnectionRatePerSec != NmsConnectionInfo.DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC)
+ {
+ TimeSpan waitTime = TimeSpan.Zero;
+ lock (syncRoot)
+ {
+ TimeSpan waitTimeForNewConnection = TimeSpan.FromMilliseconds(1_000.0 / MaxNewConnectionRatePerSec);
+
+ DateTime now = DateTime.Now;
+ if (nextAllowedConnectionCreationTime > now)
+ {
+ waitTime = (nextAllowedConnectionCreationTime - now) + waitTimeForNewConnection;
+ }
+ else
+ {
+ waitTime = TimeSpan.Zero;
+ nextAllowedConnectionCreationTime = now;
+ }
+
+ nextAllowedConnectionCreationTime += waitTimeForNewConnection;
+ }
+
+ if (waitTime > TimeSpan.Zero)
+ {
+ await Task.Delay(waitTime);
+ }
+ }
+ }
+
public INMSContext CreateContext()
{
return new NmsContext((NmsConnection)CreateConnection(), AcknowledgementMode.AutoAcknowledge);
diff --git a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
index 80d4b89..25f6529 100644
--- a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
+++ b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
@@ -35,6 +35,7 @@
public static int DEFAULT_INITIAL_RECONNECT_DELAY = 0;
public static long DEFAULT_RECONNECT_DELAY = 10;
+ public static double DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR = 0.0d;
public static double DEFAULT_RECONNECT_BACKOFF_MULTIPLIER = 2.0d;
public static long DEFAULT_MAX_RECONNECT_DELAY = (long) Math.Round(TimeSpan.FromSeconds(30).TotalMilliseconds);
public static int DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
@@ -69,6 +70,7 @@
public long InitialReconnectDelay { get; set; } = DEFAULT_INITIAL_RECONNECT_DELAY;
public long ReconnectDelay { get; set; } = DEFAULT_RECONNECT_DELAY;
+ public double ReconnectDelayRandomFactor { get; set; } = DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR;
public bool UseReconnectBackOff { get; set; } = DEFAULT_USE_RECONNECT_BACKOFF;
public double ReconnectBackOffMultiplier { get; set; } = DEFAULT_RECONNECT_BACKOFF_MULTIPLIER;
public long MaxReconnectDelay { get; set; } = DEFAULT_MAX_RECONNECT_DELAY;
@@ -597,6 +599,7 @@
private volatile bool recoveryRequired;
private long reconnectAttempts;
private long nextReconnectDelay = -1;
+ private Random random = new Random();
public ReconnectControls(FailoverProvider failoverProvider)
{
@@ -671,7 +674,10 @@
}
}
- return nextReconnectDelay;
+ long randomFactor = (long) ((1 - 2 * random.NextDouble()) *
+ failoverProvider.ReconnectDelayRandomFactor * nextReconnectDelay);
+
+ return Math.Min(failoverProvider.MaxReconnectDelay, nextReconnectDelay + randomFactor);
}
public long RecordNextAttempt()
diff --git a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
index 49a241a..3962e36 100644
--- a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
@@ -16,8 +16,12 @@
*/
using System;
+using System.Diagnostics;
+using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Provider;
+using NMS.AMQP.Test.Provider.Mock;
using NUnit.Framework;
namespace NMS.AMQP.Test
@@ -28,6 +32,16 @@
private static readonly string USER = "USER";
private static readonly string PASSWORD = "PASSWORD";
+ private MockRemotePeer mockPeer;
+
+ [SetUp]
+ public void SetUp()
+ {
+ mockPeer = new MockRemotePeer();
+ mockPeer.Start();
+ ProviderFactory.RegisterProviderFactory("mock", new MockProviderFactory());
+ }
+
[Test]
public void TestConnectionFactoryCreate()
{
@@ -77,7 +91,8 @@
"&nms.clientIDPrefix=clientId" +
"&nms.requestTimeout=1000" +
"&nms.sendTimeout=1000" +
- "&nms.localMessageExpiry=false";
+ "&nms.localMessageExpiry=false" +
+ "&nms.maxNewConnectionRatePerSec=4";
NmsConnectionFactory factory = new NmsConnectionFactory(configuredUri);
@@ -88,6 +103,7 @@
Assert.AreEqual("clientId", factory.ClientIdPrefix);
Assert.AreEqual(1000, factory.RequestTimeout);
Assert.AreEqual(1000, factory.SendTimeout);
+ Assert.AreEqual(4, factory.MaxNewConnectionRatePerSec);
Assert.IsFalse(factory.LocalMessageExpiry);
}
@@ -136,5 +152,53 @@
NmsConnectionFactory factory = new NmsConnectionFactory("bad://127.0.0.1:5763");
Assert.Throws<NMSException>(() => factory.CreateConnection());
}
+
+ [Test, Timeout(12000)]
+ public void TestMaxNewConnectionRatePerSec()
+ {
+ double desiredRatePerSec = 5;
+
+ NmsConnectionFactory factory = new NmsConnectionFactory(
+ "failover:(mock://localhost?mock.failOnConnect=true)" +
+ "?failover.maxReconnectAttempts=0" +
+ "&nms.maxNewConnectionRatePerSec="+desiredRatePerSec);
+
+ int testTimeMs = 5000;
+
+ int mainCounter = 0;
+
+ Parallel.For(0, 4, (i) =>
+ {
+ Stopwatch st = Stopwatch.StartNew();
+ IConnection connection = null;
+ int counter = -1;
+ do
+ {
+ try
+ {
+ counter++;
+ connection = factory.CreateConnection();
+ connection.Start();
+ Assert.Fail("Should have stopped after predefined number of retries.");
+ }
+ catch (NMSException)
+ {
+ }
+ finally
+ {
+ connection?.Close();
+ }
+ } while (st.ElapsedMilliseconds < testTimeMs);
+
+ lock (factory)
+ {
+ mainCounter += counter;
+ }
+ });
+
+ double ratePerSec = 1000.0 * mainCounter / testTimeMs;
+
+ Assert.AreEqual(desiredRatePerSec, ratePerSec, 1);
+ }
}
}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs
index 52a9f56..ffc17df 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs
@@ -69,7 +69,9 @@
"&failover.maxReconnectAttempts=" + (FailoverProvider.DEFAULT_MAX_RECONNECT_ATTEMPTS + 5) +
"&failover.warnAfterReconnectAttempts=" + (FailoverProvider.DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS + 6) +
"&failover.useReconnectBackOff=" + (!FailoverProvider.DEFAULT_USE_RECONNECT_BACKOFF) +
- "&failover.reconnectBackOffMultiplier=" + (FailoverProvider.DEFAULT_RECONNECT_BACKOFF_MULTIPLIER + 1.0d));
+ "&failover.reconnectBackOffMultiplier=" + (FailoverProvider.DEFAULT_RECONNECT_BACKOFF_MULTIPLIER + 1.0d) +
+ "&failover.reconnectDelayRandomFactor=" +(FailoverProvider.DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR + 1.0d)
+ );
FailoverProvider failover = ProviderFactory.Create(configured) as FailoverProvider;
Assert.IsNotNull(failover);
@@ -82,6 +84,7 @@
Assert.AreEqual(FailoverProvider.DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS + 6, failover.WarnAfterReconnectAttempts);
Assert.AreEqual(!FailoverProvider.DEFAULT_USE_RECONNECT_BACKOFF, failover.UseReconnectBackOff);
Assert.AreEqual(FailoverProvider.DEFAULT_RECONNECT_BACKOFF_MULTIPLIER + 1.0d, failover.ReconnectBackOffMultiplier, 0.0);
+ Assert.AreEqual(FailoverProvider.DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR + 1.0d, failover.ReconnectDelayRandomFactor, 0.0);
}
}
diff --git a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
index 1d7410e..ce7d28a 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
@@ -17,13 +17,13 @@
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.AMQP;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider;
using Apache.NMS.AMQP.Provider.Failover;
-using Apache.NMS.AMQP.Util;
using Moq;
using NMS.AMQP.Test.Provider.Mock;
using NUnit.Framework;
@@ -205,7 +205,92 @@
Assert.AreEqual(5, mockPeer.ContextStats.ConnectionAttempts);
Assert.AreEqual(5, mockPeer.ContextStats.CloseAttempts);
}
+
+ [Test, Timeout(5000)]
+ public void TestMaxReconnectAttemptsWithBackOffAndMaxReconnectDelay()
+ {
+ NmsConnectionFactory factory = new NmsConnectionFactory(
+ "failover:(mock://localhost?mock.failOnConnect=true)" +
+ "?failover.maxReconnectAttempts=6" +
+ "&failover.maxReconnectDelay=800" +
+ "&failover.reconnectDelay=100" +
+ "&failover.useReconnectBackOff=true");
+ IConnection connection = null;
+ try
+ {
+ connection = factory.CreateConnection();
+ connection.Start();
+ Assert.Fail("Should have stopped after five retries.");
+ }
+ catch (NMSException)
+ {
+ }
+ finally
+ {
+ connection?.Close();
+ }
+
+ Assert.AreEqual(6, mockPeer.ContextStats.ProvidersCreated);
+ Assert.AreEqual(6, mockPeer.ContextStats.ConnectionAttempts);
+ Assert.AreEqual(6, mockPeer.ContextStats.CloseAttempts);
+
+ // Verify if reconnect backoff was performed in expected growing delays
+ IEnumerable<double> expectedDelays = new double[] {100, 200, 400, 800, 800}; // At the end it should actually stop growing cause MaxReconnectDelay should kick in
+ var actualDelays = GetActualReconnectDelays();
+
+ Enumerable.Zip(expectedDelays, actualDelays, (expected, actual) => new { expected, actual })
+ .ToList()
+ .ForEach(p => Assert.AreEqual(p.expected, p.actual, 100));
+ }
+
+ [Test, Timeout(10000)]
+ public void TestMaxReconnectAttemptsWithBackOffAndRandomDelay()
+ {
+ NmsConnectionFactory factory = new NmsConnectionFactory(
+ "failover:(mock://localhost?mock.failOnConnect=true)" +
+ "?failover.maxReconnectAttempts=7" +
+ "&failover.maxReconnectDelay=3200" +
+ "&failover.reconnectDelay=100" +
+ "&failover.useReconnectBackOff=true"+
+ "&failover.reconnectDelayRandomFactor=0.9");
+
+ IConnection connection = null;
+ try
+ {
+ connection = factory.CreateConnection();
+ connection.Start();
+ Assert.Fail("Should have stopped after predefined number of retries.");
+ }
+ catch (NMSException)
+ {
+ }
+ finally
+ {
+ connection?.Close();
+ }
+
+ Assert.AreEqual(7, mockPeer.ContextStats.ProvidersCreated);
+ Assert.AreEqual(7, mockPeer.ContextStats.ConnectionAttempts);
+ Assert.AreEqual(7, mockPeer.ContextStats.CloseAttempts);
+
+ // Verify if reconnect backoff was performed in expected growing delays
+ IEnumerable<double> expectedDelays = new double[] {100, 200, 400, 800, 1600, 3200};
+ var actualDelays = GetActualReconnectDelays();
+
+ double difference = Enumerable.Zip(expectedDelays, actualDelays, (expected, actual) => Math.Abs(expected - actual)).Max();
+ Assert.GreaterOrEqual(difference,80);
+ }
+
+ private IEnumerable<double> GetActualReconnectDelays()
+ {
+ IEnumerable<double> actualDelays = Enumerable
+ .Range(1, mockPeer.ContextStats.ConnectionAttemptsTimestamps.Count - 1)
+ .Select(i => mockPeer.ContextStats.ConnectionAttemptsTimestamps[i] - mockPeer.ContextStats.ConnectionAttemptsTimestamps[i - 1])
+ .Select(a => a.TotalMilliseconds);
+ return actualDelays;
+ }
+
[Test]
public void TestFailureOnCloseIsSwallowed()
{
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs b/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
index 7090468..6a15ba5 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
@@ -39,6 +39,8 @@
public int CloseAttempts { get; private set; }
public int RecoverCalls { get; set; }
+ public List<DateTime> ConnectionAttemptsTimestamps { get; set; } = new List<DateTime>();
+
public int GetCreateResourceCalls<T>() where T : INmsResource => createResourceCalls[typeof(T)];
public int GetDestroyResourceCalls<T>() where T : INmsResource => destroyResourceCalls[typeof(T)];
@@ -55,6 +57,7 @@
{
parent?.RecordConnectAttempt();
ConnectionAttempts++;
+ ConnectionAttemptsTimestamps.Add(DateTime.Now);
}
public void RecordCloseAttempt()