QPID-8114: [Broker-J] Detach link with not-implemented error when unsupported filter is supplied among source filters
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 8919413..b3603d9 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -185,6 +185,14 @@
 
 
                     }
+                    else if (entry.getValue() instanceof Filter.InvalidFilter)
+                    {
+                        Error error = new Error();
+                        error.setCondition(AmqpError.NOT_IMPLEMENTED);
+                        error.setDescription("Unsupported filter type: " + ((Filter.InvalidFilter)entry.getValue()).getDescriptor());
+                        error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+                        throw new AmqpErrorException(error);
+                    }
                 }
             }
             source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
index 70f5ed2..85ae873 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
@@ -113,9 +113,15 @@
             }
 
             Object value = handler.parse(in);
-            if (value != null && !valueType.isAssignableFrom(value.getClass()))
+            if (value instanceof DescribedType
+                && SpecializedDescribedType.class.isAssignableFrom(valueType)
+                && SpecializedDescribedType.hasInvalidValue((Class<SpecializedDescribedType>)valueType))
             {
-                String message = String.format("Expected key type is '%s' but got '%s'",
+                value = SpecializedDescribedType.getInvalidValue((Class<SpecializedDescribedType>)valueType, (DescribedType) value);
+            }
+            else if (value != null && !valueType.isAssignableFrom(value.getClass()))
+            {
+                String message = String.format("Expected value type is '%s' but got '%s'",
                                                valueType.getSimpleName(),
                                                value.getClass().getSimpleName());
                 throw new AmqpErrorException(AmqpError.DECODE_ERROR, message);
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java
new file mode 100644
index 0000000..bb913f3
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SpecializedDescribedType.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
+public interface SpecializedDescribedType
+{
+    static <X extends SpecializedDescribedType> X getInvalidValue(Class<X> clazz, DescribedType value) {
+        for(Method method : clazz.getMethods())
+        {
+            if(method.getName().equals("getInvalidValue")
+               && method.getParameterCount() == 1
+               && method.getParameterTypes()[0] == DescribedType.class
+               && method.getReturnType() == clazz
+               && (method.getModifiers() & (Modifier.STATIC | Modifier.PUBLIC)) == (Modifier.STATIC | Modifier.PUBLIC))
+            {
+                try
+                {
+                    return (X) method.invoke(null, value);
+                }
+                catch (IllegalAccessException | InvocationTargetException e)
+                {
+                    return null;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    static <X extends SpecializedDescribedType> boolean hasInvalidValue(Class<X> clazz)
+    {
+        for(Method method : clazz.getMethods())
+        {
+            if(method.getName().equals("getInvalidValue")
+               && method.getParameterCount() == 1
+               && method.getParameterTypes()[0] == DescribedType.class
+               && method.getReturnType() == clazz
+               && (method.getModifiers() & (Modifier.STATIC | Modifier.PUBLIC)) == (Modifier.STATIC | Modifier.PUBLIC))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java
index c8dd0d9..caceb2d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Filter.java
@@ -23,6 +23,20 @@
 
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
-public interface Filter
+import org.apache.qpid.server.protocol.v1_0.codec.DescribedType;
+import org.apache.qpid.server.protocol.v1_0.codec.SpecializedDescribedType;
+
+public interface Filter extends SpecializedDescribedType
 {
+
+    interface InvalidFilter extends Filter
+    {
+        Object getDescriptor();
+    }
+
+    @SuppressWarnings("unused")
+    static Filter getInvalidValue(DescribedType describedType)
+    {
+        return (InvalidFilter) describedType::getDescriptor;
+    }
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
index 56d6e6f..748407f 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
@@ -30,6 +30,8 @@
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.transport.ByteBufferSender;
 import org.apache.qpid.tests.protocol.OutputEncoder;
+import org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilterConstructor;
+import org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilterWriter;
 
 public class FrameEncoder implements OutputEncoder
 {
@@ -39,6 +41,10 @@
                                                                                             .registerTransactionLayer()
                                                                                             .registerSecurityLayer()
                                                                                             .registerExtensionSoleconnLayer();
+    static{
+        TestFilterConstructor.register(TYPE_REGISTRY);
+        TestFilterWriter.register(TYPE_REGISTRY);
+    }
 
     @Override
     public ByteBuffer encode(final Object msg)
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java
new file mode 100644
index 0000000..9e610d9
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilter.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.type;
+
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+
+public class TestFilter implements Filter
+{
+    private final String _value;
+
+    public TestFilter(String value)
+    {
+        _value = value;
+    }
+
+    public String getValue()
+    {
+        return _value;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "TestFilter{" + _value + '}';
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        TestFilter that = (TestFilter) o;
+
+        return _value != null ? _value.equals(that._value) : that._value == null;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return _value != null ? _value.hashCode() : 0;
+    }
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java
new file mode 100644
index 0000000..de2cdb9
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterConstructor.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.type;
+
+import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeConstructor;
+import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+
+public class TestFilterConstructor extends AbstractDescribedTypeConstructor<TestFilter>
+{
+    private static final Object[] DESCRIPTORS =
+            {Symbol.valueOf("apache.org:test-filter:string"), UnsignedLong.valueOf(0x0000468C0000000AL)};
+    private static final TestFilterConstructor INSTANCE = new TestFilterConstructor();
+
+    public static void register(DescribedTypeConstructorRegistry registry)
+    {
+        for (Object descriptor : DESCRIPTORS)
+        {
+            registry.register(descriptor, INSTANCE);
+        }
+    }
+
+    @Override
+    public TestFilter construct(Object underlying) throws AmqpErrorException
+    {
+        if (underlying instanceof String)
+        {
+            return new TestFilter((String) underlying);
+        }
+        else
+        {
+            final String msg = String.format("Cannot decode 'apache.org:test-filter:string' from '%s'",
+                                             underlying == null ? null : underlying.getClass().getSimpleName());
+            throw new AmqpErrorException(AmqpError.DECODE_ERROR, msg);
+        }
+    }
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java
new file mode 100644
index 0000000..093377e
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/type/TestFilterWriter.java
@@ -0,0 +1,46 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+package org.apache.qpid.tests.protocol.v1_0.extensions.type;
+
+import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.UnsignedLongWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+
+public class TestFilterWriter extends AbstractDescribedTypeWriter<TestFilter>
+{
+    private static final ValueWriter<UnsignedLong> DESCRIPTOR_WRITER = UnsignedLongWriter.getWriter(0x0000468C0000000AL);
+
+    private TestFilterWriter(final Registry registry, final TestFilter object)
+    {
+        super(DESCRIPTOR_WRITER, registry.getValueWriter(object.getValue()));
+    }
+
+    private static final Factory<TestFilter> FACTORY = TestFilterWriter::new;
+
+    public static void register(Registry registry)
+    {
+        registry.register(TestFilter.class, FACTORY);
+    }
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java
index 027c785..680c7b6 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/filter/FilterTest.java
@@ -22,14 +22,19 @@
 
 import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.junit.After;
@@ -42,19 +47,19 @@
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
+import org.apache.qpid.tests.protocol.v1_0.extensions.type.TestFilter;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 import org.apache.qpid.tests.utils.BrokerSpecific;
@@ -154,6 +159,41 @@
 
     }
 
+    @Test
+    @SpecificationTest(section = "3.5.1", description = "")
+    public void unsupportedFilter() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final Map<Symbol, Filter> filters = new HashMap<>();
+            filters.put(Symbol.valueOf("selector-filter"), new JMSSelectorFilter("index=1"));
+            filters.put(Symbol.valueOf("test-filter"), new TestFilter("foo"));
+            final Attach responseAttach = interaction.negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse(Open.class)
+                                                     .begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachSourceFilter(filters)
+                                                     .attach().consumeResponse()
+                                                     .getLatestResponse(Attach.class);
+            assertThat(responseAttach.getName(), is(notNullValue()));
+            assertThat(responseAttach.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach.getRole(), is(Role.SENDER));
+            assertThat(responseAttach.getSource(), is(nullValue()));
+            assertThat(responseAttach.getTarget(), is(nullValue()));
+
+            final Detach responseDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            assertThat(responseDetach.getClosed(), is(true));
+            assertThat(responseDetach.getError(), is(notNullValue()));
+            assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.NOT_IMPLEMENTED)));
+
+            interaction.doCloseConnection();
+        }
+    }
+
     private QpidByteBuffer generateMessagePayloadWithApplicationProperties(final Map<String, Object> applicationProperties, String content)
     {
         MessageEncoder messageEncoder = new MessageEncoder();