Merge pull request #39 from HavretGC/AMQNET-617/support_consumer_selectors

AMQNET-617: Support Message Selectors
diff --git a/README.md b/README.md
index 6862b99..4d947ad 100644
--- a/README.md
+++ b/README.md
@@ -62,7 +62,7 @@
 | IMessageProducer | Y * | Anonymous producers are only supported on connections with the ANONYMOUS-RELAY capability. |
 | MsgDeliveryMode.Persistent | Y | Producers will block on send until an outcome is received or will timeout after waiting the RequestTimeout timespan amount. Exceptions may be throw depending on the outcome or if the producer times out. |
 | MsgDeliveryMode.NonPersistent | Y | Producers will not block on send nor expect to receive an outcome. Should an exception be raised from the outcome the exception will be delivered using the the connection ExceptionListener. |
-| IMessageConsumer | Y * | Message Selectors and noLocal filter are not supported. |
+| IMessageConsumer | Y * | NoLocal filter is not supported. |
 | Durable Consumers | Y | |
 | IQueueBrowser | N | The provider will throw NotImplementedException for the ISession create methods. |
 | Configurable NMSMessageID and amqp serializtion | N | For future consideration. The prodiver will generate a MessageID from a sequence and serialize it as a string. |
diff --git a/src/NMS.AMQP/Meta/ConsumerInfo.cs b/src/NMS.AMQP/Meta/ConsumerInfo.cs
index c6042bc..c75093b 100644
--- a/src/NMS.AMQP/Meta/ConsumerInfo.cs
+++ b/src/NMS.AMQP/Meta/ConsumerInfo.cs
@@ -42,7 +42,7 @@
         public string SubscriptionName { get; internal set; } = null;
 
         public bool NoLocal { get; internal set; } = false;
-        public bool HasSelector => !string.IsNullOrEmpty(Selector);
+        public bool HasSelector => !string.IsNullOrWhiteSpace(Selector);
         public bool IsDurable { get; set; }
         public bool IsBrowser { get; set; }
         public bool LocalMessageExpiry { get; set; }
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 1481248..a238ace 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -25,6 +25,7 @@
 using Amqp.Types;
 using Apache.NMS.AMQP.Message;
 using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Provider.Amqp.Filters;
 using Apache.NMS.AMQP.Provider.Amqp.Message;
 using Apache.NMS.AMQP.Util;
 
@@ -164,14 +165,9 @@
                 filters.Add(SymbolUtil.ATTACH_FILTER_NO_LOCAL, "NoLocalFilter{}");
             }
 
-            // Selector
-            // qpid jms defines a selector filter as an amqp described type 
-            //      AmqpJmsSelectorType where
-            //          Descriptor = 0x0000468C00000004UL
-            //          Described = "<selector_string>" (type string)
             if (info.HasSelector)
             {
-                filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, info.Selector);
+                filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, new AmqpNmsSelectorType(info.Selector));
             }
 
             // Assign filters
diff --git a/src/NMS.AMQP/Provider/Amqp/Filters/AmqpNmsSelectorType.cs b/src/NMS.AMQP/Provider/Amqp/Filters/AmqpNmsSelectorType.cs
new file mode 100644
index 0000000..fb7835d
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/Filters/AmqpNmsSelectorType.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Amqp.Types;
+
+namespace Apache.NMS.AMQP.Provider.Amqp.Filters
+{
+    public class AmqpNmsSelectorType : DescribedValue
+    {
+        private const ulong DESCRIPTOR = 0x0000468C00000004L;
+
+        public AmqpNmsSelectorType(string selector) : base(DESCRIPTOR, selector)
+        {
+        }
+
+        public override string ToString() => $"AmqpNmsSelectorType{{ {this.Value} }}";
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
new file mode 100644
index 0000000..462b4d7
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test
+{
+    [TestFixture]
+    public class NmsMessageConsumerTest : AmqpTestSupport
+    {
+        [Test, Timeout(60_000)]
+        public void TestSelectors()
+        {
+            PurgeQueue(TimeSpan.FromMilliseconds(500));
+            
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            ISession session = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IQueue queue = session.GetQueue(TestName);
+            IMessageProducer producer = session.CreateProducer(queue);
+
+            ITextMessage message = session.CreateTextMessage("Hello");
+            producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+
+            string text = "Hello + 9";
+            message = session.CreateTextMessage(text);
+            producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Highest, TimeSpan.Zero);
+            
+            producer.Close();
+
+            IMessageConsumer messageConsumer = session.CreateConsumer(queue, "JMSPriority > 8");
+            IMessage msg = messageConsumer.Receive(TimeSpan.FromSeconds(5));
+            Assert.NotNull(msg, "No message was received");
+            Assert.IsInstanceOf<ITextMessage>(msg);
+            Assert.AreEqual(text, ((ITextMessage) msg).Text);
+            Assert.IsNull(messageConsumer.Receive(TimeSpan.FromSeconds(1)));
+        }
+
+        [Test, Timeout(60_000)]
+        public void TestSelectorsWithJMSType()
+        {
+            PurgeQueue(TimeSpan.FromMilliseconds(500));
+            
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            ISession session = Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IQueue queue = session.GetQueue(TestName);
+            IMessageProducer producer = session.CreateProducer(queue);
+
+            ITextMessage message1 = session.CreateTextMessage("text");
+            producer.Send(message1, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.Zero);
+
+            string type = "myJMSType";
+            string text = "text" + type;
+            ITextMessage message2 = session.CreateTextMessage(text);
+            message2.NMSType = type;
+            producer.Send(message2, MsgDeliveryMode.Persistent, MsgPriority.Highest, TimeSpan.Zero);
+            
+            producer.Close();
+
+            IMessageConsumer messageConsumer = session.CreateConsumer(queue, $"JMSType = '{type}'");
+            IMessage msg = messageConsumer.Receive(TimeSpan.FromSeconds(5));
+            Assert.NotNull(msg, "No message was received");
+            Assert.IsInstanceOf<ITextMessage>(msg);
+            Assert.AreEqual(text, ((ITextMessage) msg).Text);
+            Assert.IsNull(messageConsumer.Receive(TimeSpan.FromSeconds(1)));
+        }
+    }
+}
\ No newline at end of file