QPID-7642: Validate subscription filters on receiving attach
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 652fa35..c85c582 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -53,6 +53,9 @@
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.filter.selector.TokenMgrError;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -2078,7 +2081,7 @@
private BindingInfo(Exchange<?> exchange,
final String queueName,
String bindingKey,
- Map<Symbol, Filter> filters)
+ Map<Symbol, Filter> filters) throws AmqpErrorException
{
String binding = null;
final Map<String, Object> arguments = new HashMap<>();
@@ -2111,9 +2114,28 @@
_actualFilters.put(entry.getKey(), entry.getValue());
arguments.put(AMQPFilterTypes.NO_LOCAL.toString(), true);
}
- else if(!hasMessageFilter && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
+ else if (!hasMessageFilter
+ && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
{
- org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
+ org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter =
+ (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
+
+ // TODO: QPID-7642 - due to inconsistent handling of invalid filters
+ // by different exchange implementations
+ // we need to validate filter before creation of binding
+ try
+ {
+ new org.apache.qpid.server.filter.JMSSelectorFilter(selectorFilter.getValue());
+ }
+ catch (ParseException | SelectorParsingException | TokenMgrError e)
+ {
+ Error error = new Error();
+ error.setCondition(AmqpError.INVALID_FIELD);
+ error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
+ error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+ throw new AmqpErrorException(error);
+ }
+
arguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), selectorFilter.getValue());
_actualFilters.put(entry.getKey(), selectorFilter);
hasMessageFilter = true;
@@ -2168,11 +2190,7 @@
final BindingInfo that = (BindingInfo) o;
- if (!_actualFilters.equals(that._actualFilters))
- {
- return false;
- }
- return _bindings.equals(that._bindings);
+ return _actualFilters.equals(that._actualFilters) && _bindings.equals(that._bindings);
}
@Override
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index 23905c8..81c0cd8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -31,6 +31,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import javax.security.auth.Subject;
@@ -41,8 +42,10 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.Connection;
@@ -56,6 +59,8 @@
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
@@ -74,6 +79,7 @@
private static final String QUEUE_NAME = "testQueue";
private static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
private static final Symbol QUEUE_CAPABILITY = Symbol.getSymbol("queue");
+ private static final Symbol JMS_SELECTOR_FILTER = Symbol.getSymbol("jms-selector");
private AMQPConnection_1_0 _connection;
private VirtualHost<?> _virtualHost;
private Session_1_0 _session;
@@ -167,9 +173,9 @@
final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
assertEquals("Unexpected number of queues after attach", 1, queues.size());
- Queue queue = queues.iterator().next();
+ Queue<?> queue = queues.iterator().next();
- Collection<Consumer<?,?>> consumers = queue.getConsumers();
+ Collection<QueueConsumer<?,?>> consumers = queue.getConsumers();
assertEquals("Unexpected number of consumers", 2, consumers.size());
}
@@ -195,9 +201,9 @@
final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
assertEquals("Unexpected number of queues after attach", 2, queues.size());
- for (Queue queue : queues)
+ for (Queue<?> queue : queues)
{
- Collection<Consumer<?,?>> consumers = queue.getConsumers();
+ Collection<QueueConsumer<?,?>> consumers = queue.getConsumers();
assertEquals("Unexpected number of consumers on queue " + queue.getName(), 1, consumers.size());
}
}
@@ -405,6 +411,80 @@
assertQueues(topicName2, LifetimePolicy.PERMANENT);
}
+ public void testReceiveAttachTopicNonDurableNoContainerWithInvalidSelector() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ Attach attach = createTopicAttach(false, linkName, address, true);
+ setSelector(attach, "invalid selector");
+
+ _session.receiveAttach(attach);
+
+ assertAttachFailed(_connection, _session, attach);
+ final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
+ assertEquals("Unexpected number of queues after attach", 0, queues.size());
+ }
+
+ public void testReceiveAttachTopicNonDurableNoContainerWithValidSelector() throws Exception
+ {
+ final String linkName = "testLink";
+ final String address = "amq.direct/" + TOPIC_NAME;
+ final String selectorExpression = "test='test'";
+ Attach attach = createTopicAttach(false, linkName, address, true);
+ setSelector(attach, selectorExpression);
+
+ _session.receiveAttach(attach);
+
+ Attach sentAttach = captureAttach(_connection, _session, 0);
+
+ assertEquals("Unexpected name", attach.getName(), sentAttach.getName());
+ assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
+ assertFilter(sentAttach, selectorExpression);
+
+ assertQueues(TOPIC_NAME, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+
+ Binding binding = findBinding("amq.direct", TOPIC_NAME);
+ assertNotNull("Binding is not found", binding);
+ Map<String,Object> arguments = binding.getArguments();
+ assertNotNull("Unexpected arguments", arguments);
+ assertEquals("Unexpected filter on binding", selectorExpression, arguments.get(AMQPFilterTypes.JMS_SELECTOR.toString()));
+ }
+
+ private void assertFilter(final Attach sentAttach, final String selectorExpression)
+ {
+ Source source = (Source)sentAttach.getSource();
+ Map<Symbol, Filter> filter = source.getFilter();
+ assertNotNull("Filter is not set in response", filter);
+ assertEquals("Unexpected filter size", 1, filter.size());
+ assertTrue("Selector is not found", filter.containsKey(JMS_SELECTOR_FILTER));
+ Filter jmsSelectorFilter = filter.get(JMS_SELECTOR_FILTER);
+ assertTrue("Unexpected selector filter", jmsSelectorFilter instanceof JMSSelectorFilter);
+ assertEquals("Unexpected selector", selectorExpression, ((JMSSelectorFilter) jmsSelectorFilter).getValue());
+ }
+
+ private Binding findBinding(final String exchangeName, final String bindingName)
+ {
+ Exchange exchange = _virtualHost.findConfiguredObject(Exchange.class, exchangeName);
+ Collection<Binding> bindings = exchange.getBindings();
+ Binding binding = null;
+ for (Binding b: bindings)
+ {
+ if (bindingName.equals(b.getName()))
+ {
+ binding = b;
+ break;
+ }
+ }
+ return binding;
+ }
+
+ private void setSelector(final Attach attach, final String selectorExpression)
+ {
+ JMSSelectorFilter selector = new JMSSelectorFilter(selectorExpression);
+ final Map<Symbol, Filter>
+ filter = Collections.<Symbol,Filter>singletonMap(Symbol.getSymbol("jms-selector"), selector);
+ ((Source)attach.getSource()).setFilter(filter);
+ }
private void assertAttachActions(final Queue<?> queue, final Attach receivedAttach)
{
@@ -436,22 +516,30 @@
final Attach receivedAttach,
final int invocationOffset)
{
- ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
- verify(connection, times(invocationOffset + 1)).sendFrame(eq((short) session.getChannelId()), frameCapture.capture());
- Attach sentAttach = (Attach) frameCapture.getAllValues().get(invocationOffset);
+ Attach sentAttach = captureAttach(connection, session, invocationOffset);
assertEquals("Unexpected name", receivedAttach.getName(), sentAttach.getName());
assertEquals("Unexpected role", Role.SENDER, sentAttach.getRole());
}
+ private Attach captureAttach(final AMQPConnection_1_0 connection,
+ final Session_1_0 session,
+ final int invocationOffset)
+ {
+ ArgumentCaptor<FrameBody> frameCapture = ArgumentCaptor.forClass(FrameBody.class);
+ verify(connection, times(invocationOffset + 1)).sendFrame(eq((short) session.getChannelId()),
+ frameCapture.capture());
+ return (Attach) frameCapture.getAllValues().get(invocationOffset);
+ }
+
private void assertQueues(final String publishingLinkName, final LifetimePolicy expectedLifetimePolicy)
{
final Collection<Queue> queues = _virtualHost.getChildren(Queue.class);
assertEquals("Unexpected number of queues after attach", 1, queues.size());
- Queue queue = queues.iterator().next();
+ Queue<?> queue = queues.iterator().next();
assertEquals("Unexpected queue durability",
expectedLifetimePolicy, queue.getLifetimePolicy());
- // boolean isDurable = ((Source) attach.getSource()).getDurable() != TerminusDurability.NONE;
+
Collection<PublishingLink> queuePublishingLinks = queue.getPublishingLinks();
assertEquals("Unexpected number of publishing links", 1, queuePublishingLinks.size());
assertEquals("Unexpected link name", publishingLinkName, queuePublishingLinks.iterator().next().getName());