AMQNET-722 Connection timing new options
diff --git a/docs/configuration.md b/docs/configuration.md
index 2b3046e..3375d45 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -23,6 +23,7 @@
 - **nms.requestTimeout** Timeout value that controls how long the client waits on completion of various synchronous interactions, such as opening a producer or consumer, before returning an error. Does not affect synchronous message sends. By default the client will wait indefinitely for a request to complete.
 - **nms.clientIdPrefix** Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory. The default prefix is 'ID:'.
 - **nms.connectionIdPrefix** Optional prefix value that is used for generated Connection ID values when a new Connection is created for the JMS ConnectionFactory. This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing the logs easier. The default prefix is 'ID:'.
+- **nms.maxNewConnectionRatePerSec** Allowed approximated rate for how fast connection factory is allowed to create new connection. If there is more request, they will have to wait. Default value is -1 which means unlimited.
 
 ### TCP Transport Configuration options
 When connected to a remote using plain TCP these options configure the behaviour of the underlying socket. These options are appended to the connection URI along with the other configuration options, for example:
diff --git a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
index f57c5d4..b656037 100644
--- a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
@@ -34,6 +34,7 @@
         public static readonly int DEFAULT_IDLE_TIMEOUT;
         public static readonly ushort DEFAULT_CHANNEL_MAX;
         public static readonly int DEFAULT_MAX_FRAME_SIZE;
+        public static double DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC = -1;
 
         static NmsConnectionInfo()
         {
diff --git a/src/NMS.AMQP/NmsConnectionFactory.cs b/src/NMS.AMQP/NmsConnectionFactory.cs
index 66543df..6729bee 100644
--- a/src/NMS.AMQP/NmsConnectionFactory.cs
+++ b/src/NMS.AMQP/NmsConnectionFactory.cs
@@ -17,6 +17,7 @@
 

 using System;

 using System.Collections.Specialized;

+using System.Threading;

 using System.Threading.Tasks;

 using Apache.NMS.AMQP.Meta;

 using Apache.NMS.AMQP.Provider;

@@ -36,6 +37,9 @@
         private IdGenerator clientIdGenerator;

         private IdGenerator connectionIdGenerator;

         private readonly object syncRoot = new object();

+        

+        DateTime nextAllowedConnectionCreationTime = DateTime.MinValue;

+

 

         public NmsConnectionFactory(string userName, string password)

         {

@@ -110,7 +114,7 @@
         /// User name value used to authenticate the connection

         /// </summary>

         public string UserName { get; set; }

-        

+

         /// <summary>

         /// The password value used to authenticate the connection

         /// </summary>

@@ -121,7 +125,7 @@
         /// before returning an error. Does not affect synchronous message sends. By default the client will wait indefinitely for a request to complete.

         /// </summary>

         public long RequestTimeout { get; set; } = NmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;

-        

+

         /// <summary>

         /// Timeout value that controls how long the client waits on completion of a synchronous message send before returning an error.

         /// By default the client will wait indefinitely for a send to complete.

@@ -146,7 +150,7 @@
         /// the logs easier. The default prefix is 'ID:'.

         /// </summary>

         public string ConnectionIdPrefix { get; set; }

-        

+

         /// <summary>

         /// Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory.

         /// The default prefix is 'ID:'.

@@ -164,7 +168,15 @@
         /// </summary>

         public string ClientId { get; set; }

 

-        public IConnection CreateConnection()

+        /// <summary>

+        /// Sets the desired max rate of creating new connections by this factory.

+        ///

+        /// NOTE: During creating new connection if the rate is too high system will

+        /// try to suspend creation execution to force the desired max rate of new connection creation

+        /// </summary>

+        public double MaxNewConnectionRatePerSec { get; set; } = NmsConnectionInfo.DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC;

+        

+    public IConnection CreateConnection()

         {

             return CreateConnection(UserName, Password);

         }

@@ -174,15 +186,17 @@
             return CreateConnectionAsync(UserName, Password);

         }

 

-        public Task<IConnection> CreateConnectionAsync(string userName, string password)

-        {

-            return Task.FromResult(CreateConnection(userName, password));

-        }

-       

         public IConnection CreateConnection(string userName, string password)

         {

+            return CreateConnectionAsync(userName, password).GetAsyncResult();

+        }

+

+        public async Task<IConnection> CreateConnectionAsync(string userName, string password)

+        {

             try

             {

+                await CheckMaxNewConnectionRate();

+

                 NmsConnectionInfo connectionInfo = ConfigureConnectionInfo(userName, password);

                 IProvider provider = ProviderFactory.Create(BrokerUri);

                 return new NmsConnection(connectionInfo, provider);

@@ -192,7 +206,38 @@
                 throw NMSExceptionSupport.Create(e);

             }

         }

+        

+        private async Task CheckMaxNewConnectionRate()

+        {

+            if (MaxNewConnectionRatePerSec != NmsConnectionInfo.DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC)

+            {

+                TimeSpan waitTime = TimeSpan.Zero;

+                lock (syncRoot)

+                {

+                    TimeSpan waitTimeForNewConnection = TimeSpan.FromMilliseconds(1_000.0 / MaxNewConnectionRatePerSec);

+                    

+                    DateTime now = DateTime.Now;

 

+                    if (nextAllowedConnectionCreationTime > now)

+                    {

+                        waitTime = (nextAllowedConnectionCreationTime - now) + waitTimeForNewConnection;

+                    }

+                    else

+                    {

+                        waitTime = TimeSpan.Zero;

+                        nextAllowedConnectionCreationTime = now;

+                    }

+

+                    nextAllowedConnectionCreationTime += waitTimeForNewConnection;

+                }

+

+                if (waitTime > TimeSpan.Zero)

+                {

+                    await Task.Delay(waitTime);

+                }

+            }

+        }

+        

         public INMSContext CreateContext()

         {

             return new NmsContext((NmsConnection)CreateConnection(), AcknowledgementMode.AutoAcknowledge);

diff --git a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
index 80d4b89..25f6529 100644
--- a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
+++ b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
@@ -35,6 +35,7 @@
 
         public static int DEFAULT_INITIAL_RECONNECT_DELAY = 0;
         public static long DEFAULT_RECONNECT_DELAY = 10;
+        public static double DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR = 0.0d;
         public static double DEFAULT_RECONNECT_BACKOFF_MULTIPLIER = 2.0d;
         public static long DEFAULT_MAX_RECONNECT_DELAY = (long) Math.Round(TimeSpan.FromSeconds(30).TotalMilliseconds);
         public static int DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
@@ -69,6 +70,7 @@
 
         public long InitialReconnectDelay { get; set; } = DEFAULT_INITIAL_RECONNECT_DELAY;
         public long ReconnectDelay { get; set; } = DEFAULT_RECONNECT_DELAY;
+        public double ReconnectDelayRandomFactor { get; set; } = DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR;
         public bool UseReconnectBackOff { get; set; } = DEFAULT_USE_RECONNECT_BACKOFF;
         public double ReconnectBackOffMultiplier { get; set; } = DEFAULT_RECONNECT_BACKOFF_MULTIPLIER;
         public long MaxReconnectDelay { get; set; } = DEFAULT_MAX_RECONNECT_DELAY;
@@ -597,6 +599,7 @@
             private volatile bool recoveryRequired;
             private long reconnectAttempts;
             private long nextReconnectDelay = -1;
+            private Random random = new Random();
 
             public ReconnectControls(FailoverProvider failoverProvider)
             {
@@ -671,7 +674,10 @@
                     }
                 }
 
-                return nextReconnectDelay;
+                long randomFactor = (long) ((1 - 2 * random.NextDouble()) *
+                                            failoverProvider.ReconnectDelayRandomFactor * nextReconnectDelay);
+
+                return Math.Min(failoverProvider.MaxReconnectDelay, nextReconnectDelay + randomFactor);
             }
 
             public long RecordNextAttempt()
diff --git a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
index 49a241a..3962e36 100644
--- a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
@@ -16,8 +16,12 @@
  */
 
 using System;
+using System.Diagnostics;
+using System.Threading.Tasks;
 using Apache.NMS;
 using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Provider;
+using NMS.AMQP.Test.Provider.Mock;
 using NUnit.Framework;
 
 namespace NMS.AMQP.Test
@@ -28,6 +32,16 @@
         private static readonly string USER = "USER";
         private static readonly string PASSWORD = "PASSWORD";
 
+        private MockRemotePeer mockPeer;
+
+        [SetUp]
+        public void SetUp()
+        {
+            mockPeer = new MockRemotePeer();
+            mockPeer.Start();
+            ProviderFactory.RegisterProviderFactory("mock", new MockProviderFactory());
+        }
+        
         [Test]
         public void TestConnectionFactoryCreate()
         {
@@ -77,7 +91,8 @@
                                 "&nms.clientIDPrefix=clientId" +
                                 "&nms.requestTimeout=1000" +
                                 "&nms.sendTimeout=1000" +
-                                "&nms.localMessageExpiry=false";
+                                "&nms.localMessageExpiry=false" +
+                                "&nms.maxNewConnectionRatePerSec=4";
 
             NmsConnectionFactory factory = new NmsConnectionFactory(configuredUri);
 
@@ -88,6 +103,7 @@
             Assert.AreEqual("clientId", factory.ClientIdPrefix);
             Assert.AreEqual(1000, factory.RequestTimeout);
             Assert.AreEqual(1000, factory.SendTimeout);
+            Assert.AreEqual(4, factory.MaxNewConnectionRatePerSec);
             Assert.IsFalse(factory.LocalMessageExpiry);
         }
         
@@ -136,5 +152,53 @@
             NmsConnectionFactory factory = new NmsConnectionFactory("bad://127.0.0.1:5763");
             Assert.Throws<NMSException>(() => factory.CreateConnection());
         }
+        
+        [Test, Timeout(12000)]
+        public void TestMaxNewConnectionRatePerSec()
+        {
+            double desiredRatePerSec = 5;
+            
+            NmsConnectionFactory factory = new NmsConnectionFactory(
+                "failover:(mock://localhost?mock.failOnConnect=true)" +
+                "?failover.maxReconnectAttempts=0" +
+                "&nms.maxNewConnectionRatePerSec="+desiredRatePerSec);
+
+            int testTimeMs = 5000;
+
+            int mainCounter = 0;
+
+            Parallel.For(0, 4, (i) =>
+            {
+                Stopwatch st = Stopwatch.StartNew();
+                IConnection connection = null;
+                int counter = -1;
+                do
+                {
+                    try
+                    {
+                        counter++;
+                        connection = factory.CreateConnection();
+                        connection.Start();
+                        Assert.Fail("Should have stopped after predefined number of retries.");
+                    }
+                    catch (NMSException)
+                    {
+                    }
+                    finally
+                    {
+                        connection?.Close();
+                    }
+                } while (st.ElapsedMilliseconds < testTimeMs);
+
+                lock (factory)
+                {
+                    mainCounter += counter;
+                }
+            });
+
+            double ratePerSec = 1000.0 * mainCounter / testTimeMs;
+            
+            Assert.AreEqual(desiredRatePerSec, ratePerSec, 1);
+        }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs
index 52a9f56..ffc17df 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs
@@ -69,7 +69,9 @@
                                     "&failover.maxReconnectAttempts=" + (FailoverProvider.DEFAULT_MAX_RECONNECT_ATTEMPTS + 5) +
                                     "&failover.warnAfterReconnectAttempts=" + (FailoverProvider.DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS + 6) +
                                     "&failover.useReconnectBackOff=" + (!FailoverProvider.DEFAULT_USE_RECONNECT_BACKOFF) +
-                                    "&failover.reconnectBackOffMultiplier=" + (FailoverProvider.DEFAULT_RECONNECT_BACKOFF_MULTIPLIER + 1.0d));
+                                    "&failover.reconnectBackOffMultiplier=" + (FailoverProvider.DEFAULT_RECONNECT_BACKOFF_MULTIPLIER + 1.0d) +
+                                    "&failover.reconnectDelayRandomFactor=" +(FailoverProvider.DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR + 1.0d)
+            );
 
             FailoverProvider failover = ProviderFactory.Create(configured) as FailoverProvider;
             Assert.IsNotNull(failover);
@@ -82,6 +84,7 @@
             Assert.AreEqual(FailoverProvider.DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS + 6, failover.WarnAfterReconnectAttempts);
             Assert.AreEqual(!FailoverProvider.DEFAULT_USE_RECONNECT_BACKOFF, failover.UseReconnectBackOff);
             Assert.AreEqual(FailoverProvider.DEFAULT_RECONNECT_BACKOFF_MULTIPLIER + 1.0d, failover.ReconnectBackOffMultiplier, 0.0);
+            Assert.AreEqual(FailoverProvider.DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR + 1.0d, failover.ReconnectDelayRandomFactor, 0.0);
         }
 
     }
diff --git a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
index 1d7410e..ce7d28a 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
@@ -17,13 +17,13 @@
 
 using System;
 using System.Collections.Generic;
+using System.Linq;
 using System.Threading.Tasks;
 using Apache.NMS;
 using Apache.NMS.AMQP;
 using Apache.NMS.AMQP.Meta;
 using Apache.NMS.AMQP.Provider;
 using Apache.NMS.AMQP.Provider.Failover;
-using Apache.NMS.AMQP.Util;
 using Moq;
 using NMS.AMQP.Test.Provider.Mock;
 using NUnit.Framework;
@@ -205,7 +205,92 @@
             Assert.AreEqual(5, mockPeer.ContextStats.ConnectionAttempts);
             Assert.AreEqual(5, mockPeer.ContextStats.CloseAttempts);
         }
+        
+        [Test, Timeout(5000)]
+        public void TestMaxReconnectAttemptsWithBackOffAndMaxReconnectDelay()
+        {
+            NmsConnectionFactory factory = new NmsConnectionFactory(
+                "failover:(mock://localhost?mock.failOnConnect=true)" +
+                "?failover.maxReconnectAttempts=6" +
+                "&failover.maxReconnectDelay=800" +
+                "&failover.reconnectDelay=100" +
+                "&failover.useReconnectBackOff=true");
 
+            IConnection connection = null;
+            try
+            {
+                connection = factory.CreateConnection();
+                connection.Start();
+                Assert.Fail("Should have stopped after five retries.");
+            }
+            catch (NMSException)
+            {
+            }
+            finally
+            {
+                connection?.Close();
+            }
+
+            Assert.AreEqual(6, mockPeer.ContextStats.ProvidersCreated);
+            Assert.AreEqual(6, mockPeer.ContextStats.ConnectionAttempts);
+            Assert.AreEqual(6, mockPeer.ContextStats.CloseAttempts);
+            
+            // Verify if reconnect backoff was performed in expected growing delays
+            IEnumerable<double> expectedDelays = new double[] {100, 200, 400, 800, 800}; // At the end it should actually stop growing cause MaxReconnectDelay should kick in
+            var actualDelays = GetActualReconnectDelays();
+
+            Enumerable.Zip(expectedDelays, actualDelays, (expected, actual) => new { expected, actual })
+            .ToList()
+            .ForEach(p => Assert.AreEqual(p.expected, p.actual, 100));
+        }
+
+        [Test, Timeout(10000)]
+        public void TestMaxReconnectAttemptsWithBackOffAndRandomDelay()
+        {
+            NmsConnectionFactory factory = new NmsConnectionFactory(
+                "failover:(mock://localhost?mock.failOnConnect=true)" +
+                "?failover.maxReconnectAttempts=7" +
+                "&failover.maxReconnectDelay=3200" +
+                "&failover.reconnectDelay=100" +
+                "&failover.useReconnectBackOff=true"+
+                "&failover.reconnectDelayRandomFactor=0.9");
+
+            IConnection connection = null;
+            try
+            {
+                connection = factory.CreateConnection();
+                connection.Start();
+                Assert.Fail("Should have stopped after predefined number of retries.");
+            }
+            catch (NMSException)
+            {
+            }
+            finally
+            {
+                connection?.Close();
+            }
+
+            Assert.AreEqual(7, mockPeer.ContextStats.ProvidersCreated);
+            Assert.AreEqual(7, mockPeer.ContextStats.ConnectionAttempts);
+            Assert.AreEqual(7, mockPeer.ContextStats.CloseAttempts);
+            
+            // Verify if reconnect backoff was performed in expected growing delays
+            IEnumerable<double> expectedDelays = new double[] {100, 200, 400, 800, 1600, 3200};
+            var actualDelays = GetActualReconnectDelays();
+
+            double difference = Enumerable.Zip(expectedDelays, actualDelays, (expected, actual) => Math.Abs(expected - actual)).Max();
+            Assert.GreaterOrEqual(difference,80);
+        }
+        
+        private IEnumerable<double> GetActualReconnectDelays()
+        {
+            IEnumerable<double> actualDelays = Enumerable
+                .Range(1, mockPeer.ContextStats.ConnectionAttemptsTimestamps.Count - 1)
+                .Select(i => mockPeer.ContextStats.ConnectionAttemptsTimestamps[i] - mockPeer.ContextStats.ConnectionAttemptsTimestamps[i - 1])
+                .Select(a => a.TotalMilliseconds);
+            return actualDelays;
+        }
+       
         [Test]
         public void TestFailureOnCloseIsSwallowed()
         {
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs b/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
index 7090468..6a15ba5 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
@@ -39,6 +39,8 @@
         public int CloseAttempts { get; private set; }
         public int RecoverCalls { get; set; }
 
+        public List<DateTime> ConnectionAttemptsTimestamps { get; set; } = new List<DateTime>(); 
+
         public int GetCreateResourceCalls<T>() where T : INmsResource => createResourceCalls[typeof(T)];
 
         public int GetDestroyResourceCalls<T>() where T : INmsResource => destroyResourceCalls[typeof(T)];
@@ -55,6 +57,7 @@
         {
             parent?.RecordConnectAttempt();
             ConnectionAttempts++;
+            ConnectionAttemptsTimestamps.Add(DateTime.Now);
         }
 
         public void RecordCloseAttempt()