Merge pull request #41 from Havret/Pre_buffered_messages_arent_released_when_consumer_closes_down

AMQNET-605: Pre-buffered messages shouldn't be released when consumer closes down
diff --git a/README.md b/README.md
index 6862b99..4d947ad 100644
--- a/README.md
+++ b/README.md
@@ -62,7 +62,7 @@
 | IMessageProducer | Y * | Anonymous producers are only supported on connections with the ANONYMOUS-RELAY capability. |
 | MsgDeliveryMode.Persistent | Y | Producers will block on send until an outcome is received or will timeout after waiting the RequestTimeout timespan amount. Exceptions may be throw depending on the outcome or if the producer times out. |
 | MsgDeliveryMode.NonPersistent | Y | Producers will not block on send nor expect to receive an outcome. Should an exception be raised from the outcome the exception will be delivered using the the connection ExceptionListener. |
-| IMessageConsumer | Y * | Message Selectors and noLocal filter are not supported. |
+| IMessageConsumer | Y * | NoLocal filter is not supported. |
 | Durable Consumers | Y | |
 | IQueueBrowser | N | The provider will throw NotImplementedException for the ISession create methods. |
 | Configurable NMSMessageID and amqp serializtion | N | For future consideration. The prodiver will generate a MessageID from a sequence and serialize it as a string. |
diff --git a/apache-nms-amqp.sln b/apache-nms-amqp.sln
index 4898062..8cc386e 100644
--- a/apache-nms-amqp.sln
+++ b/apache-nms-amqp.sln
@@ -19,6 +19,10 @@
 EndProject
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache-NMS-AMQP-Interop-Test", "test\Apache-NMS-AMQP-Interop-Test\Apache-NMS-AMQP-Interop-Test.csproj", "{E5D009CB-7DDD-4C49-B7EB-361D42D0BF14}"
 EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "benchmark", "benchmark", "{C0489CE7-BB93-48A6-A136-26FA942DB990}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PingPong", "src\PingPong\PingPong.csproj", "{CA6094FA-B615-47E6-A5D5-224BE62CDF70}"
+EndProject
 Global
 	GlobalSection(SolutionConfigurationPlatforms) = preSolution
 		Debug|Any CPU = Debug|Any CPU
@@ -101,6 +105,18 @@
 		{E5D009CB-7DDD-4C49-B7EB-361D42D0BF14}.Release|x64.Build.0 = Release|Any CPU
 		{E5D009CB-7DDD-4C49-B7EB-361D42D0BF14}.Release|x86.ActiveCfg = Release|Any CPU
 		{E5D009CB-7DDD-4C49-B7EB-361D42D0BF14}.Release|x86.Build.0 = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|x64.ActiveCfg = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|x64.Build.0 = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|x86.ActiveCfg = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Debug|x86.Build.0 = Debug|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|Any CPU.Build.0 = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|x64.ActiveCfg = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|x64.Build.0 = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|x86.ActiveCfg = Release|Any CPU
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70}.Release|x86.Build.0 = Release|Any CPU
 	EndGlobalSection
 	GlobalSection(SolutionProperties) = preSolution
 		HideSolutionNode = FALSE
@@ -112,5 +128,6 @@
 		{A56349BE-ED66-4E18-B5FE-E3EA069D2ADD} = {19D7C0B5-0D2B-4459-BE75-6DD353857B28}
 		{08936343-AA11-4409-9599-06762070CF82} = {19D7C0B5-0D2B-4459-BE75-6DD353857B28}
 		{846D7B8A-49F5-408E-9189-402F4A90912D} = {19D7C0B5-0D2B-4459-BE75-6DD353857B28}
+		{CA6094FA-B615-47E6-A5D5-224BE62CDF70} = {C0489CE7-BB93-48A6-A136-26FA942DB990}
 	EndGlobalSection
 EndGlobal
diff --git a/docs/configuration.md b/docs/configuration.md
index 1c4d1e0..2b3046e 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -37,8 +37,12 @@
 - **transport.receiveBufferSize** Specifies the SendBufferSize option of the TCP socket.
 - **transport.receiveTimeout** Specifies the ReceiveTimeout option of the TCP socket.
 - **transport.sendTimeout** Specifies the SendTimeout option of the TCP socket.
+- **transport.tcpKeepAliveTime** Specifies how often a keep-alive transmission is sent to an idle connection.
+- **transport.tcpKeepAliveInterval** Specifies how often a keep-alive transmission is sent when no response is received from previous keep-alive transmissions.
 - **transport.tcpNoDelay** Specifies the NoDelay option of the TCP socket.
 
+If *tcpKeepAliveTime* or *tcpKeepAliveInterval* it set, TCP Keep-Alive is enabled.
+
 ### Failover Configuration options
 With failover enabled the client can reconnect to another server automatically when connection to the current server is lost for some reason. The failover URI is always initiated with the failover prefix and a list of URIs for the server(s) is contained inside a set of parentheses. The "nms." options are applied to the overall failover URI, outside the parentheses, and affect the NMS Connection object for its lifetime.
 
diff --git a/src/NMS.AMQP/Meta/ConsumerInfo.cs b/src/NMS.AMQP/Meta/ConsumerInfo.cs
index c6042bc..c75093b 100644
--- a/src/NMS.AMQP/Meta/ConsumerInfo.cs
+++ b/src/NMS.AMQP/Meta/ConsumerInfo.cs
@@ -42,7 +42,7 @@
         public string SubscriptionName { get; internal set; } = null;
 
         public bool NoLocal { get; internal set; } = false;
-        public bool HasSelector => !string.IsNullOrEmpty(Selector);
+        public bool HasSelector => !string.IsNullOrWhiteSpace(Selector);
         public bool IsDurable { get; set; }
         public bool IsBrowser { get; set; }
         public bool LocalMessageExpiry { get; set; }
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 1481248..a238ace 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -25,6 +25,7 @@
 using Amqp.Types;
 using Apache.NMS.AMQP.Message;
 using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Provider.Amqp.Filters;
 using Apache.NMS.AMQP.Provider.Amqp.Message;
 using Apache.NMS.AMQP.Util;
 
@@ -164,14 +165,9 @@
                 filters.Add(SymbolUtil.ATTACH_FILTER_NO_LOCAL, "NoLocalFilter{}");
             }
 
-            // Selector
-            // qpid jms defines a selector filter as an amqp described type 
-            //      AmqpJmsSelectorType where
-            //          Descriptor = 0x0000468C00000004UL
-            //          Described = "<selector_string>" (type string)
             if (info.HasSelector)
             {
-                filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, info.Selector);
+                filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, new AmqpNmsSelectorType(info.Selector));
             }
 
             // Assign filters
diff --git a/src/NMS.AMQP/Provider/Amqp/Filters/AmqpNmsSelectorType.cs b/src/NMS.AMQP/Provider/Amqp/Filters/AmqpNmsSelectorType.cs
new file mode 100644
index 0000000..fb7835d
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/Filters/AmqpNmsSelectorType.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Amqp.Types;
+
+namespace Apache.NMS.AMQP.Provider.Amqp.Filters
+{
+    public class AmqpNmsSelectorType : DescribedValue
+    {
+        private const ulong DESCRIPTOR = 0x0000468C00000004L;
+
+        public AmqpNmsSelectorType(string selector) : base(DESCRIPTOR, selector)
+        {
+        }
+
+        public override string ToString() => $"AmqpNmsSelectorType{{ {this.Value} }}";
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Transport/ISecureTransportContext.cs b/src/NMS.AMQP/Transport/ISecureTransportContext.cs
index 4cac12e..97762dd 100644
--- a/src/NMS.AMQP/Transport/ISecureTransportContext.cs
+++ b/src/NMS.AMQP/Transport/ISecureTransportContext.cs
@@ -21,9 +21,6 @@
 {
     public interface ISecureTransportContext : ITransportContext
     {
-        
-        new ISecureTransportContext Copy();
-
         string ServerName { get; set; }
 
         string ClientCertFileName { get; set; }
diff --git a/src/NMS.AMQP/Transport/ITransportContext.cs b/src/NMS.AMQP/Transport/ITransportContext.cs
index a7afefb..05eec6e 100644
--- a/src/NMS.AMQP/Transport/ITransportContext.cs
+++ b/src/NMS.AMQP/Transport/ITransportContext.cs
@@ -33,11 +33,19 @@
         int SendTimeout { get; set; }
         
         bool TcpNoDelay { get; set; }
+
+        /// <summary>
+        /// Gets or sets how often a keep-alive transmission is sent to an idle connection.
+        /// </summary>
+        uint TcpKeepAliveTime { get; set; }
+        
+        /// <summary>
+        /// Gets or sets How often a keep-alive transmission is sent when no response is received from previous keep-alive transmissions.
+        /// </summary>
+        uint TcpKeepAliveInterval { get; set; }
         
         bool IsSecure { get; }
         
-        ITransportContext Copy();
-        
         Task<Connection> CreateAsync(Address address, IHandler handler);
     }
 }
diff --git a/src/NMS.AMQP/Transport/SecureTransportContext.cs b/src/NMS.AMQP/Transport/SecureTransportContext.cs
index cc3913c..08cfc66 100644
--- a/src/NMS.AMQP/Transport/SecureTransportContext.cs
+++ b/src/NMS.AMQP/Transport/SecureTransportContext.cs
@@ -37,8 +37,8 @@
     internal class SecureTransportContext : TransportContext, ISecureTransportContext
     {
         
-        private readonly static List<string> SupportedProtocols;
-        private readonly static Dictionary<string, int> SupportedProtocolValues;
+        private static readonly List<string> SupportedProtocols;
+        private static readonly Dictionary<string, int> SupportedProtocolValues;
 
         #region static Initializer
 
@@ -411,65 +411,6 @@
         }
 
         #endregion
-        
-        #region Copy Methods
-
-
-        protected override void CopyBuilder(Amqp.ConnectionFactory copy)
-        {
-            base.CopyBuilder(copy);
-            
-            copy.SSL.Protocols = connectionBuilder.SSL.Protocols;
-            copy.SSL.CheckCertificateRevocation = connectionBuilder.SSL.CheckCertificateRevocation;
-
-            if (connectionBuilder.SSL.ClientCertificates != null)
-            {
-                copy.SSL.ClientCertificates = new X509CertificateCollection(connectionBuilder.SSL.ClientCertificates);
-            }
-
-        }
-
-        protected override void CopyInto(TransportContext copy)
-        {
-            SecureTransportContext stcCopy = copy as SecureTransportContext;
-
-            // Copy Secure properties.
-
-            // copy keystore properties
-            stcCopy.KeyStoreName = this.KeyStoreName;
-            stcCopy.KeyStorePassword = this.KeyStorePassword;
-            stcCopy.KeyStoreLocation = this.KeyStoreLocation;
-
-            // copy certificate properties
-            stcCopy.AcceptInvalidBrokerCert = this.AcceptInvalidBrokerCert;
-            stcCopy.ServerName = this.ServerName;
-            stcCopy.ClientCertFileName = this.ClientCertFileName;
-            stcCopy.ClientCertPassword = this.ClientCertPassword;
-            stcCopy.ClientCertSubject = this.ClientCertSubject;
-
-            // copy application callback
-            stcCopy.ServerCertificateValidateCallback = this.ServerCertificateValidateCallback;
-            stcCopy.ClientCertificateSelectCallback = this.ClientCertificateSelectCallback;
-            
-            base.CopyInto(copy);
-
-            stcCopy.connectionBuilder.SSL.RemoteCertificateValidationCallback = this.ContextServerCertificateValidation;
-            stcCopy.connectionBuilder.SSL.LocalCertificateSelectionCallback = this.ContextLocalCertificateSelect;
-        }
-
-        public override ITransportContext Copy()
-        {
-            TransportContext copy = new SecureTransportContext();
-            this.CopyInto(copy);
-            return copy;
-        }
-        
-        ISecureTransportContext ISecureTransportContext.Copy()
-        {
-            return this.Copy() as SecureTransportContext;
-        }
-
-        #endregion
     }
 
 }
diff --git a/src/NMS.AMQP/Transport/TransportContext.cs b/src/NMS.AMQP/Transport/TransportContext.cs
index d5aead9..e62c9ac 100644
--- a/src/NMS.AMQP/Transport/TransportContext.cs
+++ b/src/NMS.AMQP/Transport/TransportContext.cs
@@ -26,9 +26,6 @@
 
 namespace Apache.NMS.AMQP.Transport
 {
-
-    #region Transport Context
-
     /// <summary>
     /// Transport management is mainly handled by the AmqpNetLite library, Except for custom transports.
     /// TransportContext should configure the Amqp.ConnectionFactory for the tcp transport properties.
@@ -43,15 +40,25 @@
             connectionBuilder.SASL.Profile = Amqp.Sasl.SaslProfile.Anonymous;            
         }
 
-        #region Transport Options
-
         public int ReceiveBufferSize { get => this.connectionBuilder.TCP.ReceiveBufferSize; set => this.connectionBuilder.TCP.ReceiveBufferSize = value; }
         public int ReceiveTimeout { get => this.connectionBuilder.TCP.ReceiveTimeout; set => this.connectionBuilder.TCP.ReceiveTimeout = value; }
         public int SendBufferSize { get => this.connectionBuilder.TCP.SendBufferSize; set => this.connectionBuilder.TCP.SendBufferSize = value; }
         public int SendTimeout { get => this.connectionBuilder.TCP.SendTimeout; set => this.connectionBuilder.TCP.SendTimeout = value; }
         public bool TcpNoDelay { get => this.connectionBuilder.TCP.NoDelay; set => this.connectionBuilder.TCP.NoDelay = value; }
-        public uint TcpKeepAliveTime { get => this.connectionBuilder.TCP.KeepAlive.KeepAliveTime; set => this.connectionBuilder.TCP.KeepAlive.KeepAliveTime = value; }
-        public uint TcpKeepAliveInterval { get => this.connectionBuilder.TCP.KeepAlive.KeepAliveInterval; set => this.connectionBuilder.TCP.KeepAlive.KeepAliveInterval = value; }
+
+        public uint TcpKeepAliveTime
+        {
+            get => this.connectionBuilder.TCP.KeepAlive?.KeepAliveTime ?? default;
+            set => this.TcpKeepAliveSettings.KeepAliveTime = value;
+        }
+
+        public uint TcpKeepAliveInterval
+        {
+            get => this.connectionBuilder.TCP.KeepAlive?.KeepAliveInterval ?? default;
+            set => this.TcpKeepAliveSettings.KeepAliveInterval = value;
+        }
+
+        private TcpKeepAliveSettings TcpKeepAliveSettings => this.connectionBuilder.TCP.KeepAlive ?? (this.connectionBuilder.TCP.KeepAlive = new TcpKeepAliveSettings());
 
         public bool SocketLingerEnabled
         {
@@ -84,42 +91,12 @@
                 }
             }
         }
-        
-        #endregion
 
         public virtual bool IsSecure { get; } = false;
 
-        public virtual ITransportContext Copy()
-        {
-            TransportContext copy = new TransportContext();
-            this.CopyInto(copy);
-            return copy;
-        }
-
         public virtual Task<Connection> CreateAsync(Address address, IHandler handler)
         {
             return connectionBuilder.CreateAsync(address, handler);    
         }
-
-        protected virtual void CopyInto(TransportContext copy)
-        {
-            //copy.factory = this.factory;
-            //copy.UseLogging = this.UseLogging;
-            //Amqp.ConnectionFactory builder = new Amqp.ConnectionFactory();
-            //this.CopyBuilder(builder);
-            //copy.connectionBuilder = builder;
-        }
-
-        protected virtual void CopyBuilder(Amqp.ConnectionFactory copy)
-        {
-            StringDictionary amqpProperties = PropertyUtil.GetProperties(this.connectionBuilder.AMQP);
-            StringDictionary tcpProperties = PropertyUtil.GetProperties(this.connectionBuilder.TCP);
-            PropertyUtil.SetProperties(copy.AMQP, amqpProperties);
-            PropertyUtil.SetProperties(copy.TCP, tcpProperties);
-            copy.SASL.Profile = this.connectionBuilder.SASL.Profile;
-        }
     }
-
-    #endregion
-
 }
diff --git a/src/PingPong/Ping.cs b/src/PingPong/Ping.cs
new file mode 100644
index 0000000..888f4a9
--- /dev/null
+++ b/src/PingPong/Ping.cs
@@ -0,0 +1,66 @@
+using System;
+using System.Diagnostics;
+using System.Threading.Tasks;
+using Apache.NMS;
+
+namespace PingPong
+{
+    public class Ping: IDisposable
+    {
+        private readonly IConnection connection;
+        private readonly ISession session;
+        private readonly IMessageProducer messageProducer;
+        private readonly IMessageConsumer messageConsumer;
+        private readonly Stopwatch stopwatch;
+        private readonly ITextMessage pingMessage;
+        private TaskCompletionSource<Stats> tsc;
+        private int numberOfMessages;
+        private int skipMessages;
+        private int counter;
+
+        public Ping(IConnectionFactory connectionFactory)
+        {
+            this.connection = connectionFactory.CreateConnection();
+            this.session = this.connection.CreateSession();
+            this.messageProducer = session.CreateProducer(session.GetTopic("ping"));
+            this.messageConsumer = session.CreateConsumer(session.GetTopic("pong"));
+            this.messageConsumer.Listener += OnMessage;
+            this.stopwatch = new Stopwatch();
+            this.pingMessage = session.CreateTextMessage("Ping");
+            this.connection.Start();
+        }
+
+        private void OnMessage(IMessage message)
+        {
+            if (skipMessages > 0)
+                skipMessages--;
+            else
+                counter++;
+
+            if (counter == numberOfMessages)
+            {
+                stopwatch.Stop();
+                this.tsc.SetResult(new Stats { MessagesCount = counter, Elapsed = stopwatch.Elapsed });                
+            }
+            else
+            {
+                messageProducer.Send(pingMessage);
+            }
+        }
+
+        public Task<Stats> Start(int numberOfMessages, int skipMessages)
+        {
+            this.numberOfMessages = numberOfMessages;
+            this.skipMessages = skipMessages;
+            this.tsc = new TaskCompletionSource<Stats>();
+            stopwatch.Start();
+            messageProducer.Send(pingMessage);
+            return this.tsc.Task;
+        }
+
+        public void Dispose()
+        {
+            connection?.Dispose();
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/PingPong/PingPong.csproj b/src/PingPong/PingPong.csproj
new file mode 100644
index 0000000..f5ec638
--- /dev/null
+++ b/src/PingPong/PingPong.csproj
@@ -0,0 +1,12 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <OutputType>Exe</OutputType>
+    <TargetFramework>netcoreapp2.2</TargetFramework>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="..\NMS.AMQP\Apache-NMS-AMQP.csproj" />
+  </ItemGroup>
+
+</Project>
diff --git a/src/PingPong/Pong.cs b/src/PingPong/Pong.cs
new file mode 100644
index 0000000..55f955e
--- /dev/null
+++ b/src/PingPong/Pong.cs
@@ -0,0 +1,35 @@
+using System;
+using Apache.NMS;
+
+namespace PingPong
+{
+    public class Pong : IDisposable
+    {
+        private readonly IConnection connection;
+        private readonly ISession session;
+        private readonly IMessageProducer messageProducer;
+        private readonly IMessageConsumer messageConsumer;
+        private readonly ITextMessage pongMessage;
+
+        public Pong(IConnectionFactory connectionFactory)
+        {
+            this.connection = connectionFactory.CreateConnection();
+            this.session = this.connection.CreateSession();
+            this.messageProducer = session.CreateProducer(session.GetTopic("pong"));
+            this.messageConsumer = session.CreateConsumer(session.GetTopic("ping"));
+            this.messageConsumer.Listener += OnMessage;
+            this.pongMessage = session.CreateTextMessage("Pong");
+            this.connection.Start();
+        }
+
+        private void OnMessage(IMessage message)
+        {
+            this.messageProducer.Send(pongMessage);
+        }
+
+        public void Dispose()
+        {
+            connection?.Dispose();
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/PingPong/Program.cs b/src/PingPong/Program.cs
new file mode 100644
index 0000000..0054f57
--- /dev/null
+++ b/src/PingPong/Program.cs
@@ -0,0 +1,23 @@
+using System;
+using System.Threading.Tasks;
+using Apache.NMS.AMQP;
+
+namespace PingPong
+{
+    class Program
+    {
+        static async Task Main(string[] args)
+        {
+            var connectionFactory = new NmsConnectionFactory();
+            connectionFactory.UserName = "artemis";
+            connectionFactory.Password = "simetraehcapa";
+
+            using (var ping = new Ping(connectionFactory))
+            using (new Pong(connectionFactory))
+            {
+                var stats = await ping.Start(skipMessages: 100, numberOfMessages: 10000);
+                Console.WriteLine(stats);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/PingPong/Stats.cs b/src/PingPong/Stats.cs
new file mode 100644
index 0000000..5d5c201
--- /dev/null
+++ b/src/PingPong/Stats.cs
@@ -0,0 +1,15 @@
+using System;
+
+namespace PingPong
+{
+    public class Stats
+    {
+        public int MessagesCount { get; set; }
+        public TimeSpan Elapsed { get; set; }
+
+        public override string ToString()
+        {
+            return $"Sent {MessagesCount} msg in {Elapsed.TotalMilliseconds:F2}ms -- {MessagesCount / Elapsed.TotalSeconds:F2} msg/s";
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
new file mode 100644
index 0000000..462b4d7
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test
+{
+    [TestFixture]
+    public class NmsMessageConsumerTest : AmqpTestSupport
+    {
+        [Test, Timeout(60_000)]
+        public void TestSelectors()
+        {
+            PurgeQueue(TimeSpan.FromMilliseconds(500));
+            
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            ISession session = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IQueue queue = session.GetQueue(TestName);
+            IMessageProducer producer = session.CreateProducer(queue);
+
+            ITextMessage message = session.CreateTextMessage("Hello");
+            producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+
+            string text = "Hello + 9";
+            message = session.CreateTextMessage(text);
+            producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Highest, TimeSpan.Zero);
+            
+            producer.Close();
+
+            IMessageConsumer messageConsumer = session.CreateConsumer(queue, "JMSPriority > 8");
+            IMessage msg = messageConsumer.Receive(TimeSpan.FromSeconds(5));
+            Assert.NotNull(msg, "No message was received");
+            Assert.IsInstanceOf<ITextMessage>(msg);
+            Assert.AreEqual(text, ((ITextMessage) msg).Text);
+            Assert.IsNull(messageConsumer.Receive(TimeSpan.FromSeconds(1)));
+        }
+
+        [Test, Timeout(60_000)]
+        public void TestSelectorsWithJMSType()
+        {
+            PurgeQueue(TimeSpan.FromMilliseconds(500));
+            
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            ISession session = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IQueue queue = session.GetQueue(TestName);
+            IMessageProducer producer = session.CreateProducer(queue);
+
+            ITextMessage message1 = session.CreateTextMessage("text");
+            producer.Send(message1, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+
+            string type = "myJMSType";
+            string text = "text" + type;
+            ITextMessage message2 = session.CreateTextMessage(text);
+            message2.NMSType = type;
+            producer.Send(message2, MsgDeliveryMode.Persistent, MsgPriority.Highest, TimeSpan.Zero);
+            
+            producer.Close();
+
+            IMessageConsumer messageConsumer = session.CreateConsumer(queue, $"JMSType = '{type}'");
+            IMessage msg = messageConsumer.Receive(TimeSpan.FromSeconds(5));
+            Assert.NotNull(msg, "No message was received");
+            Assert.IsInstanceOf<ITextMessage>(msg);
+            Assert.AreEqual(text, ((ITextMessage) msg).Text);
+            Assert.IsNull(messageConsumer.Receive(TimeSpan.FromSeconds(1)));
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Transport/TransportContextFactoryTest.cs b/test/Apache-NMS-AMQP-Test/Transport/TransportContextFactoryTest.cs
index 8669370..5e69210 100644
--- a/test/Apache-NMS-AMQP-Test/Transport/TransportContextFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Transport/TransportContextFactoryTest.cs
@@ -29,6 +29,8 @@
         private int customReceiveTimeout = 1000;
         private int customSendBufferSize = 32 * 1024;
         private int customSendTimeout = 2000;
+        private int customTcpKeepAliveTime = 2500;
+        private int customTcpKeepAliveInterval = 3000;
 
         [Test]
         public void TestCreateWithDefaultOptions()
@@ -54,6 +56,8 @@
                               "transport.receiveTimeout=" + customReceiveTimeout + "&" +
                               "transport.sendBufferSize=" + customSendBufferSize + "&" +
                               "transport.sendTimeout=" + customSendTimeout + "&" +
+                              "transport.tcpKeepAliveTime=" + customTcpKeepAliveTime + "&" +
+                              "transport.tcpKeepAliveInterval=" + customTcpKeepAliveInterval + "&" +
                               "transport.tcpNoDelay=" + customTcpNoDelay);
             ITransportContext transportContext = TransportContextFactory.CreateTransportContext(uri);
 
@@ -64,6 +68,8 @@
             Assert.AreEqual(customReceiveTimeout, transportContext.ReceiveTimeout);
             Assert.AreEqual(customSendBufferSize, transportContext.SendBufferSize);
             Assert.AreEqual(customSendTimeout, transportContext.SendTimeout);
+            Assert.AreEqual(customTcpKeepAliveTime, transportContext.TcpKeepAliveTime);
+            Assert.AreEqual(customTcpKeepAliveInterval, transportContext.TcpKeepAliveInterval);
             Assert.AreEqual(customTcpNoDelay, transportContext.TcpNoDelay);
         }