QPID-8114: [Broker-J] Detach link with not-implemented error when unsupported filter is supplied among source filters
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 8919413..b3603d9 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -185,6 +185,14 @@
}
+ else if (entry.getValue() instanceof Filter.InvalidFilter)
+ {
+ Error error = new Error();
+ error.setCondition(AmqpError.NOT_IMPLEMENTED);
+ error.setDescription("Unsupported filter type: " + ((Filter.InvalidFilter)entry.getValue()).getDescriptor());
+ error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+ throw new AmqpErrorException(error);
+ }
}
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
index 70f5ed2..85ae873 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
@@ -113,9 +113,15 @@
}
Object value = handler.parse(in);
- if (value != null && !valueType.isAssignableFrom(value.getClass()))
+ if (value instanceof DescribedType
+ && SpecializedDescribedType.class.isAssignableFrom(valueType)
+ && SpecializedDescribedType.hasInvalidValue((Class<SpecializedDescribedType>)valueType))
{
- String message = String.format("Expected key type is '%s' but got '%s'",
+ value = SpecializedDescribedType.getInvalidValue((Class<SpecializedDescribedType>)valueType, (DescribedType) value);
+ }
+ else if (value != null && !valueType.isAssignableFrom(value.getClass()))
+ {
+ String message = String.format("Expected value type is '%s' but got '%s'",
valueType.getSimpleName(),
value.getClass().getSimpleName());
throw new AmqpErrorException(AmqpError.DECODE_ERROR, message);
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java
new file mode 100644
index 0000000..bb913f3
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
+public interface SpecializedDescribedType
+{
+ static <X extends SpecializedDescribedType> X getInvalidValue(Class<X> clazz, DescribedType value) {
+ for(Method method : clazz.getMethods())
+ {
+ if(method.getName().equals("getInvalidValue")
+ && method.getParameterCount() == 1
+ && method.getParameterTypes()[0] == DescribedType.class
+ && method.getReturnType() == clazz
+ && (method.getModifiers() & (Modifier.STATIC | Modifier.PUBLIC)) == (Modifier.STATIC | Modifier.PUBLIC))
+ {
+ try
+ {
+ return (X) method.invoke(null, value);
+ }
+ catch (IllegalAccessException | InvocationTargetException e)
+ {
+ return null;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ static <X extends SpecializedDescribedType> boolean hasInvalidValue(Class<X> clazz)
+ {
+ for(Method method : clazz.getMethods())
+ {
+ if(method.getName().equals("getInvalidValue")
+ && method.getParameterCount() == 1
+ && method.getParameterTypes()[0] == DescribedType.class
+ && method.getReturnType() == clazz
+ && (method.getModifiers() & (Modifier.STATIC | Modifier.PUBLIC)) == (Modifier.STATIC | Modifier.PUBLIC))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java
index c8dd0d9..caceb2d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java
@@ -23,6 +23,20 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
-public interface Filter
+import org.apache.qpid.server.protocol.v1_0.codec.DescribedType;
+import org.apache.qpid.server.protocol.v1_0.codec.SpecializedDescribedType;
+
+public interface Filter extends SpecializedDescribedType
{
+
+ interface InvalidFilter extends Filter
+ {
+ Object getDescriptor();
+ }
+
+ @SuppressWarnings("unused")
+ static Filter getInvalidValue(DescribedType describedType)
+ {
+ return (InvalidFilter) describedType::getDescriptor;
+ }
}
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
index 56d6e6f..748407f 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
@@ -30,6 +30,8 @@
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.tests.protocol.OutputEncoder;
+import org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilterConstructor;
+import org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilterWriter;
public class FrameEncoder implements OutputEncoder
{
@@ -39,6 +41,10 @@
.registerTransactionLayer()
.registerSecurityLayer()
.registerExtensionSoleconnLayer();
+ static{
+ TestFilterConstructor.register(TYPE_REGISTRY);
+ TestFilterWriter.register(TYPE_REGISTRY);
+ }
@Override
public ByteBuffer encode(final Object msg)
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java
new file mode 100644
index 0000000..9e610d9
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.type;
+
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+
+public class TestFilter implements Filter
+{
+ private final String _value;
+
+ public TestFilter(String value)
+ {
+ _value = value;
+ }
+
+ public String getValue()
+ {
+ return _value;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TestFilter{" + _value + '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ TestFilter that = (TestFilter) o;
+
+ return _value != null ? _value.equals(that._value) : that._value == null;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _value != null ? _value.hashCode() : 0;
+ }
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java
new file mode 100644
index 0000000..de2cdb9
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.type;
+
+import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeConstructor;
+import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+
+public class TestFilterConstructor extends AbstractDescribedTypeConstructor<TestFilter>
+{
+ private static final Object[] DESCRIPTORS =
+ {Symbol.valueOf("apache.org:test-filter:string"), UnsignedLong.valueOf(0x0000468C0000000AL)};
+ private static final TestFilterConstructor INSTANCE = new TestFilterConstructor();
+
+ public static void register(DescribedTypeConstructorRegistry registry)
+ {
+ for (Object descriptor : DESCRIPTORS)
+ {
+ registry.register(descriptor, INSTANCE);
+ }
+ }
+
+ @Override
+ public TestFilter construct(Object underlying) throws AmqpErrorException
+ {
+ if (underlying instanceof String)
+ {
+ return new TestFilter((String) underlying);
+ }
+ else
+ {
+ final String msg = String.format("Cannot decode 'apache.org:test-filter:string' from '%s'",
+ underlying == null ? null : underlying.getClass().getSimpleName());
+ throw new AmqpErrorException(AmqpError.DECODE_ERROR, msg);
+ }
+ }
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java
new file mode 100644
index 0000000..093377e
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java
@@ -0,0 +1,46 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+package org.apache.qpid.tests.protocol.v1_0.extensions.type;
+
+import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.UnsignedLongWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+
+public class TestFilterWriter extends AbstractDescribedTypeWriter<TestFilter>
+{
+ private static final ValueWriter<UnsignedLong> DESCRIPTOR_WRITER = UnsignedLongWriter.getWriter(0x0000468C0000000AL);
+
+ private TestFilterWriter(final Registry registry, final TestFilter object)
+ {
+ super(DESCRIPTOR_WRITER, registry.getValueWriter(object.getValue()));
+ }
+
+ private static final Factory<TestFilter> FACTORY = TestFilterWriter::new;
+
+ public static void register(Registry registry)
+ {
+ registry.register(TestFilter.class, FACTORY);
+ }
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java
index 027c785..680c7b6 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java
@@ -22,14 +22,19 @@
import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import org.junit.After;
@@ -42,19 +47,19 @@
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
+import org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilter;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
import org.apache.qpid.tests.utils.BrokerSpecific;
@@ -154,6 +159,41 @@
}
+ @Test
+ @SpecificationTest(section = "3.5.1", description = "")
+ public void unsupportedFilter() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final Map<Symbol, Filter> filters = new HashMap<>();
+ filters.put(Symbol.valueOf("selector-filter"), new JMSSelectorFilter("index=1"));
+ filters.put(Symbol.valueOf("test-filter"), new TestFilter("foo"));
+ final Attach responseAttach = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachSourceFilter(filters)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach.getName(), is(notNullValue()));
+ assertThat(responseAttach.getHandle().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach.getRole(), is(Role.SENDER));
+ assertThat(responseAttach.getSource(), is(nullValue()));
+ assertThat(responseAttach.getTarget(), is(nullValue()));
+
+ final Detach responseDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ assertThat(responseDetach.getClosed(), is(true));
+ assertThat(responseDetach.getError(), is(notNullValue()));
+ assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.NOT_IMPLEMENTED)));
+
+ interaction.doCloseConnection();
+ }
+ }
+
private QpidByteBuffer generateMessagePayloadWithApplicationProperties(final Map<String, Object> applicationProperties, String content)
{
MessageEncoder messageEncoder = new MessageEncoder();