QPID-7694:[Broker-J][AMQP 0-8..0-10] Add validation for exchange declare arguments
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 5dda560..d97f1c6 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -22,6 +22,7 @@
import java.nio.charset.StandardCharsets;
import java.security.AccessControlException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -30,7 +31,10 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
@@ -57,9 +61,12 @@
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AlternateBinding;
+import org.apache.qpid.server.model.ConfiguredObjectAttribute;
+import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NoFactoryForTypeException;
import org.apache.qpid.server.model.Queue;
@@ -870,17 +877,6 @@
String exchangeName = method.getExchange();
NamedAddressSpace addressSpace = getAddressSpace(session);
- //we must check for any unsupported arguments present and throw not-implemented
- if(method.hasArguments())
- {
- Map<String,Object> args = method.getArguments();
- //QPID-3392: currently we don't support any!
- if(!args.isEmpty())
- {
- exception(session, method, ExecutionErrorCode.NOT_IMPLEMENTED, "Unsupported exchange argument(s) found " + args.keySet().toString());
- return;
- }
- }
String alternateExchangeName = method.getAlternateExchange();
if(nameNullOrEmpty(method.getExchange()))
{
@@ -926,7 +922,10 @@
try
{
Map<String,Object> attributes = new HashMap<String, Object>();
-
+ if(method.hasArguments())
+ {
+ attributes.putAll(method.getArguments());
+ }
attributes.put(org.apache.qpid.server.model.Exchange.NAME, method.getExchange());
attributes.put(org.apache.qpid.server.model.Exchange.TYPE, method.getType());
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable());
@@ -939,6 +938,7 @@
attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_BINDING,
Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
}
+ validateExchangeDeclareArguments(attributes, session.getAMQPConnection().getModel());
addressSpace.createMessageDestination(Exchange.class, attributes);;
}
catch(ReservedExchangeNameException e)
@@ -997,6 +997,24 @@
}
}
+ private void validateExchangeDeclareArguments(final Map<String, Object> attributes, final Model model)
+ {
+ final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry();
+ final List<ConfiguredObjectAttribute<?, ?>> types = new ArrayList<>(typeRegistry.getAttributeTypes(Exchange.class).values());
+ typeRegistry.getTypeSpecialisations(Exchange.class).forEach(type -> types.addAll(typeRegistry.getTypeSpecificAttributes(type)));
+ final Set<String> unsupported = attributes.keySet()
+ .stream()
+ .filter(name -> types.stream().noneMatch(a -> Objects.equals(name, a.getName())
+ && !a.isDerived()))
+ .collect(Collectors.toSet());
+
+ if (!unsupported.isEmpty())
+ {
+ throw new IllegalArgumentException(String.format(
+ "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+ }
+ }
+
private void exception(ServerSession session, Method method, ExecutionErrorCode errorCode, String description)
{
ExecutionException ex = new ExecutionException();
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index fc85de2..a70c525 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -35,10 +35,12 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import javax.security.auth.Subject;
@@ -77,6 +79,8 @@
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObjectAttribute;
+import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -2624,6 +2628,7 @@
attributes.put(Exchange.ALTERNATE_BINDING,
Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
}
+ validateExchangeDeclareArguments(attributes);
exchange = virtualHost.createMessageDestination(Exchange.class, attributes);
if (!nowait)
@@ -2689,7 +2694,7 @@
}
catch (IllegalArgumentException | IllegalConfigurationException e)
{
- _connection.sendConnectionClose(ErrorCodes.COMMAND_INVALID, "Error creating exchange '"
+ _connection.sendConnectionClose(ErrorCodes.INVALID_ARGUMENT, "Error creating exchange '"
+ exchangeName
+ "': "
+ e.getMessage(), getChannelId());
@@ -2700,6 +2705,24 @@
}
+ private void validateExchangeDeclareArguments(final Map<String, Object> attributes)
+ {
+ final ConfiguredObjectTypeRegistry typeRegistry = getModel().getTypeRegistry();
+ final List<ConfiguredObjectAttribute<?, ?>> types = new ArrayList<>(typeRegistry.getAttributeTypes(Exchange.class).values());
+ typeRegistry.getTypeSpecialisations(Exchange.class).forEach(type -> types.addAll(typeRegistry.getTypeSpecificAttributes(type)));
+ final Set<String> unsupported = attributes.keySet()
+ .stream()
+ .filter(name -> types.stream().noneMatch(a -> Objects.equals(name, a.getName())
+ && !a.isDerived()))
+ .collect(Collectors.toSet());
+
+ if (!unsupported.isEmpty())
+ {
+ throw new IllegalArgumentException(String.format(
+ "Unsupported exchange declare arguments : %s", String.join(",", unsupported)));
+ }
+ }
+
private void validateAlternateExchangeIsNotQueue(final NamedAddressSpace addressSpace, final String alternateExchangeName)
{
MessageDestination alternateMessageDestination = addressSpace.getAttainedMessageDestination(alternateExchangeName, false);
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExchangeInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExchangeInteraction.java
index 02afaa8..d1fcd45 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExchangeInteraction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExchangeInteraction.java
@@ -235,4 +235,10 @@
{
return _interaction.sendPerformative(_unbind);
}
+
+ public ExchangeInteraction declareArguments(final Map<String, Object> arguments)
+ {
+ _declare.setArguments(arguments);
+ return this;
+ }
}
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/exchange/ExchangeTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/exchange/ExchangeTest.java
new file mode 100644
index 0000000..a7a67d1
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/exchange/ExchangeTest.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10.extensions.exchange;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionException;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_10.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class ExchangeTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private static final byte[] SESSION_NAME = "test".getBytes(UTF_8);
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ public void exchangeDeclareValidWireArguments() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ SessionCompleted completed = interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .exchange()
+ .declareExchange("test")
+ .declareArguments(Collections.singletonMap("unroutableMessageBehaviour", "REJECT"))
+ .declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
+ .declareId(0)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse()
+ .getLatestResponse(SessionCompleted.class);
+
+ assertThat(completed.getCommands().includes(0), is(equalTo(true)));
+ }
+ }
+
+ @Test
+ public void exchangeDeclareInvalidWireArguments() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .exchange()
+ .declareExchange("test")
+ .declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
+ .declareId(0)
+ .declareArguments(Collections.singletonMap("foo", "bar"))
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush();
+
+ ExecutionException exception =
+ interaction.consume(ExecutionException.class, SessionCompleted.class, SessionCommandPoint.class);
+
+ assertThat(exception.getErrorCode(), is(equalTo(ExecutionErrorCode.ILLEGAL_ARGUMENT)));
+ }
+ }
+
+}
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeTest.java
new file mode 100644
index 0000000..8ad178a
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeTest.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.exchange;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.ErrorCodes;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeBoundOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class ExchangeTest extends BrokerAdminUsingTestBase
+{
+ private static final String TEST_EXCHANGE = "testExchange";
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ public void exchangeDeclareValidWireArguments() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange()
+ .declareName(TEST_EXCHANGE)
+ .declareArguments(Collections.singletonMap("unroutableMessageBehaviour", "REJECT"))
+ .declare()
+ .consumeResponse(ExchangeDeclareOkBody.class);
+
+ ExchangeBoundOkBody response = interaction.exchange()
+ .boundExchangeName(TEST_EXCHANGE)
+ .bound()
+ .consumeResponse()
+ .getLatestResponse(ExchangeBoundOkBody.class);
+ assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.NO_BINDINGS)));
+ }
+ }
+
+ @Test
+ public void exchangeDeclareInvalidWireArguments() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionCloseBody response = interaction.openAnonymousConnection()
+ .channel()
+ .open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .exchange()
+ .declareName(TEST_EXCHANGE)
+ .declareArguments(Collections.singletonMap("foo", "bar"))
+ .declare()
+ .consumeResponse(ConnectionCloseBody.class)
+ .getLatestResponse(ConnectionCloseBody.class);
+
+ assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.INVALID_ARGUMENT)));
+ }
+ }
+}