blob: 02cdd479f652295283b3b893182c049022ff7463 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.exchange;
import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownAlternateBindingException;
import org.apache.qpid.test.utils.UnitTestBase;
public class DirectExchangeTest extends UnitTestBase
{
private DirectExchange<?> _exchange;
private VirtualHost<?> _vhost;
private InstanceProperties _instanceProperties;
private ServerMessage<?> _messageWithNoHeaders;
@Before
public void setUp() throws Exception
{
BrokerTestHelper.setUp();
_vhost = BrokerTestHelper.createVirtualHost(getTestName(), this);
Map<String,Object> attributes = new HashMap<>();
attributes.put(Exchange.NAME, "test");
attributes.put(Exchange.DURABLE, false);
attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
_exchange = (DirectExchange<?>) _vhost.createChild(Exchange.class, attributes);
_instanceProperties = mock(InstanceProperties.class);
_messageWithNoHeaders = createTestMessage(Collections.emptyMap());
}
@After
public void tearDown() throws Exception
{
try
{
if (_vhost != null)
{
_vhost.close();
}
}
finally
{
BrokerTestHelper.tearDown();
}
}
@Test
public void testCreationOfExchangeWithReservedExchangePrefixRejected() throws Exception
{
Map<String, Object> attributes = new HashMap<>();
attributes.put(Exchange.NAME, "amq.wibble");
attributes.put(Exchange.DURABLE, false);
attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
try
{
_exchange = (DirectExchangeImpl) _vhost.createChild(Exchange.class, attributes);
_exchange.open();
fail("Exception not thrown");
}
catch (ReservedExchangeNameException rene)
{
// PASS
}
}
@Test
public void testAmqpDirectRecreationRejected() throws Exception
{
DirectExchangeImpl amqpDirect = (DirectExchangeImpl) _vhost.getChildByName(Exchange.class, ExchangeDefaults.DIRECT_EXCHANGE_NAME);
assertNotNull(amqpDirect);
assertSame(amqpDirect, _vhost.getChildById(Exchange.class, amqpDirect.getId()));
assertSame(amqpDirect, _vhost.getChildByName(Exchange.class, amqpDirect.getName()));
Map<String, Object> attributes = new HashMap<>();
attributes.put(Exchange.NAME, ExchangeDefaults.DIRECT_EXCHANGE_NAME);
attributes.put(Exchange.DURABLE, true);
attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
try
{
_exchange = (DirectExchangeImpl) _vhost.createChild(Exchange.class, attributes);
_exchange.open();
fail("Exception not thrown");
}
catch (ReservedExchangeNameException rene)
{
// PASS
}
// QPID-6599, defect would mean that the existing exchange was removed from the in memory model.
assertSame(amqpDirect, _vhost.getChildById(Exchange.class, amqpDirect.getId()));
assertSame(amqpDirect, _vhost.getChildByName(Exchange.class, amqpDirect.getName()));
}
@Test
public void testDeleteOfExchangeSetAsAlternate() throws Exception
{
Map<String, Object> attributes = new HashMap<>();
attributes.put(Queue.NAME, getTestName());
attributes.put(Queue.DURABLE, false);
attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, _exchange.getName()));
Queue queue = _vhost.createChild(Queue.class, attributes);
queue.open();
assertEquals("Unexpected alternate exchange on queue", _exchange, queue.getAlternateBindingDestination());
try
{
_exchange.delete();
fail("Exchange deletion should fail with MessageDestinationIsAlternateException");
}
catch(MessageDestinationIsAlternateException e)
{
// pass
}
assertEquals("Unexpected effective exchange state", State.ACTIVE, _exchange.getState());
assertEquals("Unexpected desired exchange state", State.ACTIVE, _exchange.getDesiredState());
}
@Test
public void testAlternateBindingValidationRejectsNonExistingDestination()
{
Map<String, Object> attributes = new HashMap<>();
attributes.put(Exchange.NAME, getTestName());
attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
String alternateBinding = "nonExisting";
attributes.put(Exchange.ALTERNATE_BINDING,
Collections.singletonMap(AlternateBinding.DESTINATION, alternateBinding));
try
{
_vhost.createChild(Exchange.class, attributes);
fail("Expected exception is not thrown");
}
catch (UnknownAlternateBindingException e)
{
assertEquals("Unexpected exception alternate binding", alternateBinding, e.getAlternateBindingName());
}
}
@Test
public void testAlternateBindingValidationRejectsSelf()
{
Map<String, String> alternateBinding = Collections.singletonMap(AlternateBinding.DESTINATION, _exchange.getName());
Map<String, Object> newAttributes = Collections.singletonMap(Exchange.ALTERNATE_BINDING, alternateBinding);
try
{
_exchange.setAttributes(newAttributes);
fail("Expected exception is not thrown");
}
catch (IllegalConfigurationException e)
{
// pass
}
}
@Test
public void testDurableExchangeRejectsNonDurableAlternateBinding()
{
Map<String, Object> dlqAttributes = new HashMap<>();
String dlqName = getTestName() + "_DLQ";
dlqAttributes.put(Queue.NAME, dlqName);
dlqAttributes.put(Queue.DURABLE, false);
_vhost.createChild(Queue.class, dlqAttributes);
Map<String, Object> exchangeAttributes = new HashMap<>();
exchangeAttributes.put(Exchange.NAME, getTestName());
exchangeAttributes.put(Exchange.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, dlqName));
exchangeAttributes.put(Exchange.DURABLE, true);
exchangeAttributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
try
{
_vhost.createChild(Exchange.class, exchangeAttributes);
fail("Expected exception is not thrown");
}
catch (IllegalConfigurationException e)
{
// pass
}
}
@Test
public void testAlternateBinding()
{
Map<String, Object> attributes = new HashMap<>();
attributes.put(Exchange.NAME, getTestName());
attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
attributes.put(Exchange.ALTERNATE_BINDING,
Collections.singletonMap(AlternateBinding.DESTINATION, _exchange.getName()));
attributes.put(Exchange.DURABLE, false);
Exchange newExchange = _vhost.createChild(Exchange.class, attributes);
assertEquals("Unexpected alternate binding",
_exchange.getName(),
newExchange.getAlternateBinding().getDestination());
}
@Test
public void testRouteToQueue()
{
String boundKey = "key";
Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, boundKey,
_instanceProperties);
assertFalse("Message unexpectedly routed to queue before bind", result.hasRoutes());
boolean bind = _exchange.bind(queue.getName(), boundKey, Collections.emptyMap(), false);
assertTrue("Bind operation should be successful", bind);
result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
assertTrue("Message unexpectedly not routed to queue after bind", result.hasRoutes());
boolean unbind = _exchange.unbind(queue.getName(), boundKey);
assertTrue("Unbind operation should be successful", unbind);
result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
assertFalse("Message unexpectedly routed to queue after unbind", result.hasRoutes());
}
@Test
public void testRouteToQueueWithSelector()
{
String boundKey = "key";
Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
InstanceProperties instanceProperties = mock(InstanceProperties.class);
ServerMessage<?> matchingMessage = createTestMessage(Collections.singletonMap("prop", true));
ServerMessage<?> unmatchingMessage = createTestMessage(Collections.singletonMap("prop", false));
boolean bind = _exchange.bind(queue.getName(), boundKey,
Collections.singletonMap(JMS_SELECTOR.toString(), "prop = True"),
false);
assertTrue("Bind operation should be successful", bind);
RoutingResult<ServerMessage<?>> result = _exchange.route(matchingMessage, boundKey, instanceProperties);
assertTrue("Message with matching selector not routed to queue", result.hasRoutes());
result = _exchange.route(unmatchingMessage, boundKey, instanceProperties);
assertFalse("Message with matching selector unexpectedly routed to queue", result.hasRoutes());
boolean unbind = _exchange.unbind(queue.getName(), boundKey);
assertTrue("Unbind operation should be successful", unbind);
result = _exchange.route(matchingMessage, boundKey, instanceProperties);
assertFalse("Message with matching selector unexpectedly routed to queue after unbind",
result.hasRoutes());
}
@Test
public void testRouteToQueueViaTwoExchanges()
{
String boundKey = "key";
Map<String, Object> attributes = new HashMap<>();
attributes.put(Exchange.NAME, getTestName());
attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
Exchange via = _vhost.createChild(Exchange.class, attributes);
Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
boolean exchToViaBind = _exchange.bind(via.getName(), boundKey, Collections.emptyMap(), false);
assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
boolean viaToQueueBind = via.bind(queue.getName(), boundKey, Collections.emptyMap(), false);
assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
boundKey,
_instanceProperties);
assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
}
@Test
public void testDestinationDeleted()
{
String boundKey = "key";
Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
assertFalse(_exchange.isBound(boundKey));
assertFalse(_exchange.isBound(boundKey, queue));
assertFalse(_exchange.isBound(queue));
_exchange.bind(queue.getName(), boundKey, Collections.emptyMap(), false);
assertTrue(_exchange.isBound(boundKey));
assertTrue(_exchange.isBound(boundKey, queue));
assertTrue(_exchange.isBound(queue));
queue.delete();
assertFalse(_exchange.isBound(boundKey));
assertFalse(_exchange.isBound(boundKey, queue));
assertFalse(_exchange.isBound(queue));
}
private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
{
AMQMessageHeader header = mock(AMQMessageHeader.class);
headerValues.forEach((key, value) -> when(header.getHeader(key)).thenReturn(value));
@SuppressWarnings("unchecked")
ServerMessage<?> message = mock(ServerMessage.class);
when(message.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
when(message.getMessageHeader()).thenReturn(header);
return message;
}
@Test
public void testRouteToMultipleQueues()
{
String boundKey = "key";
Queue<?> queue1 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue1"));
Queue<?> queue2 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue2"));
boolean bind1 = _exchange.bind(queue1.getName(), boundKey, Collections.emptyMap(), false);
assertTrue("Bind operation to queue1 should be successful", bind1);
RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
assertEquals("Message routed to unexpected number of queues",
(long) 1,
(long) result.getNumberOfRoutes());
_exchange.bind(queue2.getName(), boundKey, Collections.singletonMap(JMS_SELECTOR.toString(), "prop is null"), false);
result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
assertEquals("Message routed to unexpected number of queues",
(long) 2,
(long) result.getNumberOfRoutes());
_exchange.unbind(queue1.getName(), boundKey);
result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
assertEquals("Message routed to unexpected number of queues",
(long) 1,
(long) result.getNumberOfRoutes());
_exchange.unbind(queue2.getName(), boundKey);
result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
assertEquals("Message routed to unexpected number of queues",
(long) 0,
(long) result.getNumberOfRoutes());
}
@Test
public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKey()
{
Map<String, Object> viaExchangeArguments = new HashMap<>();
viaExchangeArguments.put(Exchange.NAME, "via_exchange");
viaExchangeArguments.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
Exchange via = _vhost.createChild(Exchange.class, viaExchangeArguments);
Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
boolean exchToViaBind = _exchange.bind(via.getName(),
"key1",
Collections.singletonMap(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "key2"),
false);
assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
boolean viaToQueueBind = via.bind(queue.getName(), "key2", Collections.emptyMap(), false);
assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
"key1",
_instanceProperties);
assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
}
@Test
public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKeyAndFiltering()
{
Map<String, Object> viaExchangeArguments = new HashMap<>();
viaExchangeArguments.put(Exchange.NAME, getTestName() + "_via_exch");
viaExchangeArguments.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
Exchange via = _vhost.createChild(Exchange.class, viaExchangeArguments);
Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
Map<String, Object> exchToViaBindArguments = new HashMap<>();
exchToViaBindArguments.put(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "key2");
exchToViaBindArguments.put(JMS_SELECTOR.toString(), "prop = True");
boolean exchToViaBind = _exchange.bind(via.getName(),
"key1",
exchToViaBindArguments,
false);
assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
boolean viaToQueueBind = via.bind(queue.getName(), "key2", Collections.emptyMap(), false);
assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
RoutingResult<ServerMessage<?>> result = _exchange.route(createTestMessage(Collections.singletonMap("prop", true)),
"key1",
_instanceProperties);
assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
result = _exchange.route(createTestMessage(Collections.singletonMap("prop", false)),
"key1",
_instanceProperties);
assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
}
@Test
public void testBindWithInvalidSelector()
{
final String queueName = getTestName() + "_queue";
_vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
try
{
_exchange.bind(queueName, queueName, bindArguments, false);
fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
}
catch (IllegalArgumentException e)
{
// pass
}
final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
assertFalse("Message is unexpectedly routed to queue", result.hasRoutes());
}
@Test
public void testBindWithInvalidSelectorWhenBindingExists()
{
final String queueName = getTestName() + "_queue";
_vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, queueName));
final Map<String, Object> bindArguments = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in ('bar')");
final boolean isBound = _exchange.bind(queueName, queueName, bindArguments, false);
assertTrue("Could not bind queue", isBound);
final ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("foo", "bar"));
final RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage, queueName, _instanceProperties);
assertTrue("Message should be routed to queue", result.hasRoutes());
final Map<String, Object> bindArguments2 = Collections.singletonMap(JMS_SELECTOR.toString(), "foo in (");
try
{
_exchange.bind(queueName, queueName, bindArguments2, true);
fail("Queue can be bound when invalid selector expression is supplied as part of bind arguments");
}
catch (IllegalArgumentException e)
{
// pass
}
final RoutingResult<ServerMessage<?>> result2 = _exchange.route(testMessage, queueName, _instanceProperties);
assertTrue("Message should be be possible to route using old binding", result2.hasRoutes());
}
}