QPID-7642: Validate subscription filters on receiving attach
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 652fa35..c85c582 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -53,6 +53,9 @@
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.filter.selector.TokenMgrError;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -2078,7 +2081,7 @@
         private BindingInfo(Exchange<?> exchange,
                             final String queueName,
                             String bindingKey,
-                            Map<Symbol, Filter> filters)
+                            Map<Symbol, Filter> filters) throws AmqpErrorException
         {
             String binding = null;
             final Map<String, Object> arguments = new HashMap<>();
@@ -2111,9 +2114,28 @@
                         _actualFilters.put(entry.getKey(), entry.getValue());
                         arguments.put(AMQPFilterTypes.NO_LOCAL.toString(), true);
                     }
-                    else if(!hasMessageFilter && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
+                    else if (!hasMessageFilter
+                             && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
                     {
-                        org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
+                        org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter =
+                                (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
+
+                        // TODO: QPID-7642 - due to inconsistent handling of invalid filters
+                        // by different exchange implementations
+                        // we need to validate filter before creation of binding
+                        try
+                        {
+                            new org.apache.qpid.server.filter.JMSSelectorFilter(selectorFilter.getValue());
+                        }
+                        catch (ParseException | SelectorParsingException | TokenMgrError e)
+                        {
+                            Error error = new Error();
+                            error.setCondition(AmqpError.INVALID_FIELD);
+                            error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
+                            error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+                            throw new AmqpErrorException(error);
+                        }
+
                         arguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), selectorFilter.getValue());
                         _actualFilters.put(entry.getKey(), selectorFilter);
                         hasMessageFilter = true;
@@ -2168,11 +2190,7 @@
 
             final BindingInfo that = (BindingInfo) o;
 
-            if (!_actualFilters.equals(that._actualFilters))
-            {
-                return false;
-            }
-            return _bindings.equals(that._bindings);
+            return _actualFilters.equals(that._actualFilters) && _bindings.equals(that._bindings);
         }
 
         @Override
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index 23905c8..81c0cd8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -31,6 +31,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import javax.security.auth.Subject;
@@ -41,8 +42,10 @@
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Connection;
@@ -56,6 +59,8 @@
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
@@ -74,6 +79,7 @@
     private static final String QUEUE_NAME = "testQueue";
     private static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
     private static final Symbol QUEUE_CAPABILITY = Symbol.getSymbol("queue");
+    private static final Symbol JMS_SELECTOR_FILTER = Symbol.getSymbol("jms-selector");
     private AMQPConnection_1_0 _connection;
     private VirtualHost<?> _virtualHost;
     private Session_1_0 _session;
@@ -167,9 +173,9 @@
 
         final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
         assertEquals("Unexpected number of queues after attach", 1, queues.size());
-        Queue queue = queues.iterator().next();
+        Queue<?> queue = queues.iterator().next();
 
-        Collection<Consumer<?,?>> consumers = queue.getConsumers();
+        Collection<QueueConsumer<?,?>> consumers = queue.getConsumers();
         assertEquals("Unexpected number of consumers", 2, consumers.size());
     }
 
@@ -195,9 +201,9 @@
         final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
         assertEquals("Unexpected number of queues after attach", 2, queues.size());
 
-        for (Queue queue : queues)
+        for (Queue<?> queue : queues)
         {
-            Collection<Consumer<?,?>> consumers = queue.getConsumers();
+            Collection<QueueConsumer<?,?>> consumers = queue.getConsumers();
             assertEquals("Unexpected number of consumers on queue " + queue.getName(),  1, consumers.size());
         }
     }
@@ -405,6 +411,80 @@
         assertQueues(topicName2, LifetimePolicy.PERMANENT);
     }
 
+    public void testReceiveAttachTopicNonDurableNoContainerWithInvalidSelector() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        Attach attach = createTopicAttach(false, linkName, address, true);
+        setSelector(attach, "invalid selector");
+
+        _session.receiveAttach(attach);
+
+        assertAttachFailed(_connection, _session, attach);
+        final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+        assertEquals("Unexpected number of queues after attach", 0, queues.size());
+    }
+
+    public void testReceiveAttachTopicNonDurableNoContainerWithValidSelector() throws Exception
+    {
+        final String linkName = "testLink";
+        final String address = "amq.direct/" + TOPIC_NAME;
+        final String selectorExpression = "test='test'";
+        Attach attach = createTopicAttach(false, linkName, address, true);
+        setSelector(attach, selectorExpression);
+
+        _session.receiveAttach(attach);
+
+        Attach sentAttach = captureAttach(_connection, _session, 0);
+
+        assertEquals("Unexpected name", attach.getName(), sentAttach.getName());
+        assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
+        assertFilter(sentAttach, selectorExpression);
+
+        assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+
+        Binding binding = findBinding("amq.direct", TOPIC_NAME);
+        assertNotNull("Binding is not found", binding);
+        Map<String,Object> arguments = binding.getArguments();
+        assertNotNull("Unexpected arguments", arguments);
+        assertEquals("Unexpected filter on binding", selectorExpression, arguments.get(AMQPFilterTypes.JMS_SELECTOR.toString()));
+    }
+
+    private void assertFilter(final Attach sentAttach, final String selectorExpression)
+    {
+        Source source = (Source)sentAttach.getSource();
+        Map<Symbol, Filter> filter = source.getFilter();
+        assertNotNull("Filter is not set in response", filter);
+        assertEquals("Unexpected filter size", 1, filter.size());
+        assertTrue("Selector is not found", filter.containsKey(JMS_SELECTOR_FILTER));
+        Filter jmsSelectorFilter = filter.get(JMS_SELECTOR_FILTER);
+        assertTrue("Unexpected selector filter", jmsSelectorFilter instanceof JMSSelectorFilter);
+        assertEquals("Unexpected selector", selectorExpression, ((JMSSelectorFilter) jmsSelectorFilter).getValue());
+    }
+
+    private Binding findBinding(final String exchangeName, final String bindingName)
+    {
+        Exchange exchange = _virtualHost.findConfiguredObject(Exchange.class, exchangeName);
+        Collection<Binding> bindings = exchange.getBindings();
+        Binding binding = null;
+        for (Binding b: bindings)
+        {
+            if (bindingName.equals(b.getName()))
+            {
+                binding = b;
+                break;
+            }
+        }
+        return binding;
+    }
+
+    private void setSelector(final Attach attach, final String selectorExpression)
+    {
+        JMSSelectorFilter selector = new JMSSelectorFilter(selectorExpression);
+        final Map<Symbol, Filter>
+                filter = Collections.<Symbol,Filter>singletonMap(Symbol.getSymbol("jms-selector"), selector);
+        ((Source)attach.getSource()).setFilter(filter);
+    }
 
     private void assertAttachActions(final Queue<?> queue, final Attach receivedAttach)
     {
@@ -436,22 +516,30 @@
                                   final Attach receivedAttach,
                                   final int invocationOffset)
     {
-        ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
-        verify(connection, times(invocationOffset + 1)).sendFrame(eq((short) session.getChannelId()), frameCapture.capture());
-        Attach sentAttach = (Attach) frameCapture.getAllValues().get(invocationOffset);
+        Attach sentAttach = captureAttach(connection, session, invocationOffset);
 
         assertEquals("Unexpected name", receivedAttach.getName(), sentAttach.getName());
         assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
     }
 
+    private Attach captureAttach(final AMQPConnection_1_0 connection,
+                                 final Session_1_0 session,
+                                 final int invocationOffset)
+    {
+        ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+        verify(connection, times(invocationOffset + 1)).sendFrame(eq((short) session.getChannelId()),
+                                                                  frameCapture.capture());
+        return (Attach) frameCapture.getAllValues().get(invocationOffset);
+    }
+
     private void assertQueues(final String publishingLinkName, final LifetimePolicy expectedLifetimePolicy)
     {
         final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
         assertEquals("Unexpected number of queues after attach", 1, queues.size());
-        Queue queue = queues.iterator().next();
+        Queue<?> queue = queues.iterator().next();
         assertEquals("Unexpected queue durability",
                      expectedLifetimePolicy, queue.getLifetimePolicy());
-        // boolean isDurable = ((Source) attach.getSource()).getDurable() != TerminusDurability.NONE;
+
         Collection<PublishingLink> queuePublishingLinks = queue.getPublishingLinks();
         assertEquals("Unexpected number of publishing links", 1, queuePublishingLinks.size());
         assertEquals("Unexpected link name", publishingLinkName, queuePublishingLinks.iterator().next().getName());