Merge pull request #40 from Havret/support_no_local_message_filter

AMQNET-618: Support noLocal filter
diff --git a/apache-nms-amqp.sln b/apache-nms-amqp.sln
index 4898062..8cc386e 100644
--- a/apache-nms-amqp.sln
+++ b/apache-nms-amqp.sln
@@ -19,6 +19,10 @@
 EndProject
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache-NMS-AMQP-Interop-Test", "test\Apache-NMS-AMQP-Interop-Test\Apache-NMS-AMQP-Interop-Test.csproj", "{E5D009CB-7DDD-4C49-B7EB-361D42D0BF14}"
 EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "benchmark", "benchmark", "{C0489CE7-BB93-48A6-A136-26FA942DB990}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PingPong", "src\PingPong\PingPong.csproj", "{CA6094FA-B615-47E6-A5D5-224BE62CDF70}"
+EndProject
 Global
 	GlobalSection(SolutionConfigurationPlatforms) = preSolution
 		Debug|Any CPU = Debug|Any CPU
@@ -101,6 +105,18 @@
 		{E5D009CB-7DDD-4C49-B7EB-361D42D0BF14}.Release|x64.Build.0 = Release|Any CPU
 		{E5D009CB-7DDD-4C49-B7EB-361D42D0BF14}.Release|x86.ActiveCfg = Release|Any CPU
 		{E5D009CB-7DDD-4C49-B7EB-361D42D0BF14}.Release|x86.Build.0 = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|x64.ActiveCfg = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|x64.Build.0 = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|x86.ActiveCfg = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|x86.Build.0 = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|Any CPU.Build.0 = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|x64.ActiveCfg = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|x64.Build.0 = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|x86.ActiveCfg = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|x86.Build.0 = Release|Any CPU
 	EndGlobalSection
 	GlobalSection(SolutionProperties) = preSolution
 		HideSolutionNode = FALSE
@@ -112,5 +128,6 @@
 		{A56349BE-ED66-4E18-B5FE-E3EA069D2ADD} = {19D7C0B5-0D2B-4459-BE75-6DD353857B28}
 		{08936343-AA11-4409-9599-06762070CF82} = {19D7C0B5-0D2B-4459-BE75-6DD353857B28}
 		{846D7B8A-49F5-408E-9189-402F4A90912D} = {19D7C0B5-0D2B-4459-BE75-6DD353857B28}
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70} = {C0489CE7-BB93-48A6-A136-26FA942DB990}
 	EndGlobalSection
 EndGlobal
diff --git a/docs/configuration.md b/docs/configuration.md
index 1c4d1e0..2b3046e 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -37,8 +37,12 @@
 - **transport.receiveBufferSize** Specifies the SendBufferSize option of the TCP socket.
 - **transport.receiveTimeout** Specifies the ReceiveTimeout option of the TCP socket.
 - **transport.sendTimeout** Specifies the SendTimeout option of the TCP socket.
+- **transport.tcpKeepAliveTime** Specifies how often a keep-alive transmission is sent to an idle connection.
+- **transport.tcpKeepAliveInterval** Specifies how often a keep-alive transmission is sent when no response is received from previous keep-alive transmissions.
 - **transport.tcpNoDelay** Specifies the NoDelay option of the TCP socket.
 
+If *tcpKeepAliveTime* or *tcpKeepAliveInterval* it set, TCP Keep-Alive is enabled.
+
 ### Failover Configuration options
 With failover enabled the client can reconnect to another server automatically when connection to the current server is lost for some reason. The failover URI is always initiated with the failover prefix and a list of URIs for the server(s) is contained inside a set of parentheses. The "nms." options are applied to the overall failover URI, outside the parentheses, and affect the NMS Connection object for its lifetime.
 
diff --git a/src/NMS.AMQP/Transport/ISecureTransportContext.cs b/src/NMS.AMQP/Transport/ISecureTransportContext.cs
index 4cac12e..97762dd 100644
--- a/src/NMS.AMQP/Transport/ISecureTransportContext.cs
+++ b/src/NMS.AMQP/Transport/ISecureTransportContext.cs
@@ -21,9 +21,6 @@
 {
     public interface ISecureTransportContext : ITransportContext
     {
-        
-        new ISecureTransportContext Copy();
-
         string ServerName { get; set; }
 
         string ClientCertFileName { get; set; }
diff --git a/src/NMS.AMQP/Transport/ITransportContext.cs b/src/NMS.AMQP/Transport/ITransportContext.cs
index a7afefb..05eec6e 100644
--- a/src/NMS.AMQP/Transport/ITransportContext.cs
+++ b/src/NMS.AMQP/Transport/ITransportContext.cs
@@ -33,11 +33,19 @@
         int SendTimeout { get; set; }
         
         bool TcpNoDelay { get; set; }
+
+        /// <summary>
+        /// Gets or sets how often a keep-alive transmission is sent to an idle connection.
+        /// </summary>
+        uint TcpKeepAliveTime { get; set; }
+        
+        /// <summary>
+        /// Gets or sets How often a keep-alive transmission is sent when no response is received from previous keep-alive transmissions.
+        /// </summary>
+        uint TcpKeepAliveInterval { get; set; }
         
         bool IsSecure { get; }
         
-        ITransportContext Copy();
-        
         Task<Connection> CreateAsync(Address address, IHandler handler);
     }
 }
diff --git a/src/NMS.AMQP/Transport/SecureTransportContext.cs b/src/NMS.AMQP/Transport/SecureTransportContext.cs
index cc3913c..08cfc66 100644
--- a/src/NMS.AMQP/Transport/SecureTransportContext.cs
+++ b/src/NMS.AMQP/Transport/SecureTransportContext.cs
@@ -37,8 +37,8 @@
     internal class SecureTransportContext : TransportContext, ISecureTransportContext
     {
         
-        private readonly static List<string> SupportedProtocols;
-        private readonly static Dictionary<string, int> SupportedProtocolValues;
+        private static readonly List<string> SupportedProtocols;
+        private static readonly Dictionary<string, int> SupportedProtocolValues;
 
         #region static Initializer
 
@@ -411,65 +411,6 @@
         }
 
         #endregion
-        
-        #region Copy Methods
-
-
-        protected override void CopyBuilder(Amqp.ConnectionFactory copy)
-        {
-            base.CopyBuilder(copy);
-            
-            copy.SSL.Protocols = connectionBuilder.SSL.Protocols;
-            copy.SSL.CheckCertificateRevocation = connectionBuilder.SSL.CheckCertificateRevocation;
-
-            if (connectionBuilder.SSL.ClientCertificates != null)
-            {
-                copy.SSL.ClientCertificates = new X509CertificateCollection(connectionBuilder.SSL.ClientCertificates);
-            }
-
-        }
-
-        protected override void CopyInto(TransportContext copy)
-        {
-            SecureTransportContext stcCopy = copy as SecureTransportContext;
-
-            // Copy Secure properties.
-
-            // copy keystore properties
-            stcCopy.KeyStoreName = this.KeyStoreName;
-            stcCopy.KeyStorePassword = this.KeyStorePassword;
-            stcCopy.KeyStoreLocation = this.KeyStoreLocation;
-
-            // copy certificate properties
-            stcCopy.AcceptInvalidBrokerCert = this.AcceptInvalidBrokerCert;
-            stcCopy.ServerName = this.ServerName;
-            stcCopy.ClientCertFileName = this.ClientCertFileName;
-            stcCopy.ClientCertPassword = this.ClientCertPassword;
-            stcCopy.ClientCertSubject = this.ClientCertSubject;
-
-            // copy application callback
-            stcCopy.ServerCertificateValidateCallback = this.ServerCertificateValidateCallback;
-            stcCopy.ClientCertificateSelectCallback = this.ClientCertificateSelectCallback;
-            
-            base.CopyInto(copy);
-
-            stcCopy.connectionBuilder.SSL.RemoteCertificateValidationCallback = this.ContextServerCertificateValidation;
-            stcCopy.connectionBuilder.SSL.LocalCertificateSelectionCallback = this.ContextLocalCertificateSelect;
-        }
-
-        public override ITransportContext Copy()
-        {
-            TransportContext copy = new SecureTransportContext();
-            this.CopyInto(copy);
-            return copy;
-        }
-        
-        ISecureTransportContext ISecureTransportContext.Copy()
-        {
-            return this.Copy() as SecureTransportContext;
-        }
-
-        #endregion
     }
 
 }
diff --git a/src/NMS.AMQP/Transport/TransportContext.cs b/src/NMS.AMQP/Transport/TransportContext.cs
index d5aead9..e62c9ac 100644
--- a/src/NMS.AMQP/Transport/TransportContext.cs
+++ b/src/NMS.AMQP/Transport/TransportContext.cs
@@ -26,9 +26,6 @@
 
 namespace Apache.NMS.AMQP.Transport
 {
-
-    #region Transport Context
-
     /// <summary>
     /// Transport management is mainly handled by the AmqpNetLite library, Except for custom transports.
     /// TransportContext should configure the Amqp.ConnectionFactory for the tcp transport properties.
@@ -43,15 +40,25 @@
             connectionBuilder.SASL.Profile = Amqp.Sasl.SaslProfile.Anonymous;            
         }
 
-        #region Transport Options
-
         public int ReceiveBufferSize { get => this.connectionBuilder.TCP.ReceiveBufferSize; set => this.connectionBuilder.TCP.ReceiveBufferSize = value; }
         public int ReceiveTimeout { get => this.connectionBuilder.TCP.ReceiveTimeout; set => this.connectionBuilder.TCP.ReceiveTimeout = value; }
         public int SendBufferSize { get => this.connectionBuilder.TCP.SendBufferSize; set => this.connectionBuilder.TCP.SendBufferSize = value; }
         public int SendTimeout { get => this.connectionBuilder.TCP.SendTimeout; set => this.connectionBuilder.TCP.SendTimeout = value; }
         public bool TcpNoDelay { get => this.connectionBuilder.TCP.NoDelay; set => this.connectionBuilder.TCP.NoDelay = value; }
-        public uint TcpKeepAliveTime { get => this.connectionBuilder.TCP.KeepAlive.KeepAliveTime; set => this.connectionBuilder.TCP.KeepAlive.KeepAliveTime = value; }
-        public uint TcpKeepAliveInterval { get => this.connectionBuilder.TCP.KeepAlive.KeepAliveInterval; set => this.connectionBuilder.TCP.KeepAlive.KeepAliveInterval = value; }
+
+        public uint TcpKeepAliveTime
+        {
+            get => this.connectionBuilder.TCP.KeepAlive?.KeepAliveTime ?? default;
+            set => this.TcpKeepAliveSettings.KeepAliveTime = value;
+        }
+
+        public uint TcpKeepAliveInterval
+        {
+            get => this.connectionBuilder.TCP.KeepAlive?.KeepAliveInterval ?? default;
+            set => this.TcpKeepAliveSettings.KeepAliveInterval = value;
+        }
+
+        private TcpKeepAliveSettings TcpKeepAliveSettings => this.connectionBuilder.TCP.KeepAlive ?? (this.connectionBuilder.TCP.KeepAlive = new TcpKeepAliveSettings());
 
         public bool SocketLingerEnabled
         {
@@ -84,42 +91,12 @@
                 }
             }
         }
-        
-        #endregion
 
         public virtual bool IsSecure { get; } = false;
 
-        public virtual ITransportContext Copy()
-        {
-            TransportContext copy = new TransportContext();
-            this.CopyInto(copy);
-            return copy;
-        }
-
         public virtual Task<Connection> CreateAsync(Address address, IHandler handler)
         {
             return connectionBuilder.CreateAsync(address, handler);    
         }
-
-        protected virtual void CopyInto(TransportContext copy)
-        {
-            //copy.factory = this.factory;
-            //copy.UseLogging = this.UseLogging;
-            //Amqp.ConnectionFactory builder = new Amqp.ConnectionFactory();
-            //this.CopyBuilder(builder);
-            //copy.connectionBuilder = builder;
-        }
-
-        protected virtual void CopyBuilder(Amqp.ConnectionFactory copy)
-        {
-            StringDictionary amqpProperties = PropertyUtil.GetProperties(this.connectionBuilder.AMQP);
-            StringDictionary tcpProperties = PropertyUtil.GetProperties(this.connectionBuilder.TCP);
-            PropertyUtil.SetProperties(copy.AMQP, amqpProperties);
-            PropertyUtil.SetProperties(copy.TCP, tcpProperties);
-            copy.SASL.Profile = this.connectionBuilder.SASL.Profile;
-        }
     }
-
-    #endregion
-
 }
diff --git a/src/PingPong/Ping.cs b/src/PingPong/Ping.cs
new file mode 100644
index 0000000..888f4a9
--- /dev/null
+++ b/src/PingPong/Ping.cs
@@ -0,0 +1,66 @@
+using System;
+using System.Diagnostics;
+using System.Threading.Tasks;
+using Apache.NMS;
+
+namespace PingPong
+{
+    public class Ping: IDisposable
+    {
+        private readonly IConnection connection;
+        private readonly ISession session;
+        private readonly IMessageProducer messageProducer;
+        private readonly IMessageConsumer messageConsumer;
+        private readonly Stopwatch stopwatch;
+        private readonly ITextMessage pingMessage;
+        private TaskCompletionSource<Stats> tsc;
+        private int numberOfMessages;
+        private int skipMessages;
+        private int counter;
+
+        public Ping(IConnectionFactory connectionFactory)
+        {
+            this.connection = connectionFactory.CreateConnection();
+            this.session = this.connection.CreateSession();
+            this.messageProducer = session.CreateProducer(session.GetTopic("ping"));
+            this.messageConsumer = session.CreateConsumer(session.GetTopic("pong"));
+            this.messageConsumer.Listener += OnMessage;
+            this.stopwatch = new Stopwatch();
+            this.pingMessage = session.CreateTextMessage("Ping");
+            this.connection.Start();
+        }
+
+        private void OnMessage(IMessage message)
+        {
+            if (skipMessages > 0)
+                skipMessages--;
+            else
+                counter++;
+
+            if (counter == numberOfMessages)
+            {
+                stopwatch.Stop();
+                this.tsc.SetResult(new Stats { MessagesCount = counter, Elapsed = stopwatch.Elapsed });                
+            }
+            else
+            {
+                messageProducer.Send(pingMessage);
+            }
+        }
+
+        public Task<Stats> Start(int numberOfMessages, int skipMessages)
+        {
+            this.numberOfMessages = numberOfMessages;
+            this.skipMessages = skipMessages;
+            this.tsc = new TaskCompletionSource<Stats>();
+            stopwatch.Start();
+            messageProducer.Send(pingMessage);
+            return this.tsc.Task;
+        }
+
+        public void Dispose()
+        {
+            connection?.Dispose();
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/PingPong/PingPong.csproj b/src/PingPong/PingPong.csproj
new file mode 100644
index 0000000..f5ec638
--- /dev/null
+++ b/src/PingPong/PingPong.csproj
@@ -0,0 +1,12 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <OutputType>Exe</OutputType>
+    <TargetFramework>netcoreapp2.2</TargetFramework>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="..\NMS.AMQP\Apache-NMS-AMQP.csproj" />
+  </ItemGroup>
+
+</Project>
diff --git a/src/PingPong/Pong.cs b/src/PingPong/Pong.cs
new file mode 100644
index 0000000..55f955e
--- /dev/null
+++ b/src/PingPong/Pong.cs
@@ -0,0 +1,35 @@
+using System;
+using Apache.NMS;
+
+namespace PingPong
+{
+    public class Pong : IDisposable
+    {
+        private readonly IConnection connection;
+        private readonly ISession session;
+        private readonly IMessageProducer messageProducer;
+        private readonly IMessageConsumer messageConsumer;
+        private readonly ITextMessage pongMessage;
+
+        public Pong(IConnectionFactory connectionFactory)
+        {
+            this.connection = connectionFactory.CreateConnection();
+            this.session = this.connection.CreateSession();
+            this.messageProducer = session.CreateProducer(session.GetTopic("pong"));
+            this.messageConsumer = session.CreateConsumer(session.GetTopic("ping"));
+            this.messageConsumer.Listener += OnMessage;
+            this.pongMessage = session.CreateTextMessage("Pong");
+            this.connection.Start();
+        }
+
+        private void OnMessage(IMessage message)
+        {
+            this.messageProducer.Send(pongMessage);
+        }
+
+        public void Dispose()
+        {
+            connection?.Dispose();
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/PingPong/Program.cs b/src/PingPong/Program.cs
new file mode 100644
index 0000000..0054f57
--- /dev/null
+++ b/src/PingPong/Program.cs
@@ -0,0 +1,23 @@
+using System;
+using System.Threading.Tasks;
+using Apache.NMS.AMQP;
+
+namespace PingPong
+{
+    class Program
+    {
+        static async Task Main(string[] args)
+        {
+            var connectionFactory = new NmsConnectionFactory();
+            connectionFactory.UserName = "artemis";
+            connectionFactory.Password = "simetraehcapa";
+
+            using (var ping = new Ping(connectionFactory))
+            using (new Pong(connectionFactory))
+            {
+                var stats = await ping.Start(skipMessages: 100, numberOfMessages: 10000);
+                Console.WriteLine(stats);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/PingPong/Stats.cs b/src/PingPong/Stats.cs
new file mode 100644
index 0000000..5d5c201
--- /dev/null
+++ b/src/PingPong/Stats.cs
@@ -0,0 +1,15 @@
+using System;
+
+namespace PingPong
+{
+    public class Stats
+    {
+        public int MessagesCount { get; set; }
+        public TimeSpan Elapsed { get; set; }
+
+        public override string ToString()
+        {
+            return $"Sent {MessagesCount} msg in {Elapsed.TotalMilliseconds:F2}ms -- {MessagesCount / Elapsed.TotalSeconds:F2} msg/s";
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
index d85e93b..cdb25e6 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
@@ -232,67 +232,6 @@
             }
         }
 
-        // TODO: To be fixed
-        [Test, Timeout(20_000), Ignore("Ignore")]
-        public void TestCloseDurableSubscriberWithUnackedAndUnconsumedPrefetchedMessages()
-        {
-            using (TestAmqpPeer testPeer = new TestAmqpPeer())
-            {
-                IConnection connection = EstablishConnection(testPeer);
-                connection.Start();
-
-                testPeer.ExpectBegin();
-
-                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-
-                string topicName = "myTopic";
-                string subscriptionName = "mySubscription";
-                ITopic topic = session.GetTopic(topicName);
-
-                int messageCount = 5;
-                // Create a consumer and fill the prefetch with some messages,
-                // which we will consume some of but ack none of.
-                testPeer.ExpectDurableSubscriberAttach(topicName, subscriptionName);
-                testPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), count: messageCount);
-
-                IMessageConsumer durableConsumer = session.CreateDurableConsumer(topic, subscriptionName, null, false);
-
-                int consumeCount = 2;
-                IMessage receivedMessage = null;
-                for (int i = 1; i <= consumeCount; i++)
-                {
-                    receivedMessage = durableConsumer.Receive();
-                    Assert.NotNull(receivedMessage);
-                    Assert.IsInstanceOf<NmsTextMessage>(receivedMessage);
-                }
-
-                // Expect the messages that were not delivered to be released.
-                for (int i = 1; i <= consumeCount; i++)
-                {
-                    testPeer.ExpectDispositionThatIsAcceptedAndSettled();
-                }
-
-                receivedMessage.Acknowledge();
-
-                testPeer.ExpectDetach(expectClosed: false, sendResponse: true, replyClosed: false);
-
-                for (int i = consumeCount + 1; i <= messageCount; i++)
-                {
-                    testPeer.ExpectDispositionThatIsReleasedAndSettled();
-                }
-
-                testPeer.ExpectEnd();
-
-                durableConsumer.Close();
-                session.Close();
-
-                testPeer.ExpectClose();
-                connection.Close();
-
-                testPeer.WaitForAllMatchersToComplete(3000);
-            }
-        }
-
         [Test, Timeout(20_000)]
         public void TestConsumerReceiveThrowsIfConnectionLost()
         {
diff --git a/test/Apache-NMS-AMQP-Test/Transport/TransportContextFactoryTest.cs b/test/Apache-NMS-AMQP-Test/Transport/TransportContextFactoryTest.cs
index 8669370..5e69210 100644
--- a/test/Apache-NMS-AMQP-Test/Transport/TransportContextFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Transport/TransportContextFactoryTest.cs
@@ -29,6 +29,8 @@
         private int customReceiveTimeout = 1000;
         private int customSendBufferSize = 32 * 1024;
         private int customSendTimeout = 2000;
+        private int customTcpKeepAliveTime = 2500;
+        private int customTcpKeepAliveInterval = 3000;
 
         [Test]
         public void TestCreateWithDefaultOptions()
@@ -54,6 +56,8 @@
                               "transport.receiveTimeout=" + customReceiveTimeout + "&" +
                               "transport.sendBufferSize=" + customSendBufferSize + "&" +
                               "transport.sendTimeout=" + customSendTimeout + "&" +
+                              "transport.tcpKeepAliveTime=" + customTcpKeepAliveTime + "&" +
+                              "transport.tcpKeepAliveInterval=" + customTcpKeepAliveInterval + "&" +
                               "transport.tcpNoDelay=" + customTcpNoDelay);
             ITransportContext transportContext = TransportContextFactory.CreateTransportContext(uri);
 
@@ -64,6 +68,8 @@
             Assert.AreEqual(customReceiveTimeout, transportContext.ReceiveTimeout);
             Assert.AreEqual(customSendBufferSize, transportContext.SendBufferSize);
             Assert.AreEqual(customSendTimeout, transportContext.SendTimeout);
+            Assert.AreEqual(customTcpKeepAliveTime, transportContext.TcpKeepAliveTime);
+            Assert.AreEqual(customTcpKeepAliveInterval, transportContext.TcpKeepAliveInterval);
             Assert.AreEqual(customTcpNoDelay, transportContext.TcpNoDelay);
         }