QPID-6028: [Java Broker] Ensure queue removal triggers exchange unbind
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 5987d40..438d00f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -248,7 +248,7 @@
final Map<String, Object> bindArguments =
UNBIND_ARGUMENTS_CREATOR.createMap(b.getBindingKey(), destination);
getEventLogger().message(_logSubject, BindingMessages.DELETED(String.valueOf(bindArguments)));
-
+ onUnbind(new BindingIdentifier(b.getBindingKey(), destination));
_bindings.remove(b);
}
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index a9e1d80..7c34fe8 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -69,6 +69,7 @@
private QueueManagingVirtualHost _virtualHost;
private TaskExecutor _taskExecutor;
+ @Override
public void setUp()
{
Map<String,Object> attributes = new HashMap<String, Object>();
@@ -100,6 +101,8 @@
_exchange.open();
}
+ @Override
+
public void tearDown() throws Exception
{
super.tearDown();
@@ -125,30 +128,111 @@
public void testIsBoundStringMapAMQQueue()
{
- Queue<?> queue = bindQueue();
+ Queue<?> queue = bindQueue("matters");
assertTrue("Should return true for a bound queue",
_exchange.isBound("matters", null, queue));
}
public void testIsBoundStringAMQQueue()
{
- Queue<?> queue = bindQueue();
+ Queue<?> queue = bindQueue("matters");
assertTrue("Should return true for a bound queue",
_exchange.isBound("matters", queue));
}
public void testIsBoundAMQQueue()
{
- Queue<?> queue = bindQueue();
+ Queue<?> queue = bindQueue("matters");
assertTrue("Should return true for a bound queue",
_exchange.isBound(queue));
}
- private Queue<?> bindQueue()
+
+ public void testRouteToDestination() throws Exception
+ {
+ List<? extends BaseQueue> result;
+ Queue<?> queue = mockQueue();
+
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange without bindings routed message to unexpected number of queues", 0, result.size());
+
+ _exchange.addBinding("key", queue, null);
+
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange with 1 binding routed message to unexpected number of queues", 1, result.size());
+
+ _exchange.deleteBinding("key", queue);
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange with no bindings routed message to unexpected number of queues", 0, result.size());
+ }
+
+ public void testDestinationRemoved() throws Exception
+ {
+ List<? extends BaseQueue> result;
+ Queue<?> queue = mockQueue();
+
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange without bindings routed message to unexpected number of queues", 0, result.size());
+
+ _exchange.addBinding("key", queue, null);
+
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange with 1 binding routed message to unexpected number of queues", 1, result.size());
+
+ _exchange.destinationRemoved(queue);
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange with no bindings routed message to unexpected number of queues", 0, result.size());
+ }
+
+
+ public void testRoutingWithSelectors() throws Exception
{
Queue<?> queue = mockQueue();
- _exchange.addBinding("matters", queue, null);
+ List<? extends BaseQueue> result;
+
+ _exchange.addBinding("key2", queue, Collections.<String, Object>singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),"prop = True"));
+
+ result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
+
+ assertEquals("Expected matching message to be routed to queue", 1, result.size());
+ assertTrue("Expected matching message to be routed to queue", result.contains(queue));
+
+ result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
+
+ assertEquals("Expected non matching message not to be routed to queue", 0, result.size());
+ }
+
+ public void testMultipleBindings() throws Exception
+ {
+ Queue<?> queue1 = mockQueue();
+ Queue<?> queue2 = mockQueue();
+
+ List<? extends BaseQueue> result;
+
+ _exchange.addBinding("key", queue1, null);
+ _exchange.addBinding("key", queue2, null);
+
+ result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be in routing result", result.contains(queue1));
+ assertTrue("Expected queue2 to be in routing result", result.contains(queue2));
+
+ _exchange.addBinding("key1", queue2, null);
+
+ result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
+
+ assertEquals("Expected message to be routed to both queues", 2, result.size());
+ assertTrue("Expected queue1 to be in routing result", result.contains(queue1));
+ assertTrue("Expected queue2 to be in routing result", result.contains(queue2));
+ }
+
+ private Queue<?> bindQueue(final String bindingKey)
+ {
+ Queue<?> queue = mockQueue();
+
+ _exchange.addBinding(bindingKey, queue, null);
return queue;
}
@@ -171,54 +255,6 @@
return queue;
}
- public void testRoutingWithSelectors() throws Exception
- {
- Queue<?> queue1 = mockQueue();
- Queue<?> queue2 = mockQueue();
-
-
- _exchange.addBinding("key",queue1, null);
- _exchange.addBinding("key",queue2, null);
-
- List<? extends BaseQueue> result;
- result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
-
- assertEquals("Expected message to be routed to both queues", 2, result.size());
- assertTrue("Expected queue1 to be routed to", result.contains(queue1));
- assertTrue("Expected queue2 to be routed to", result.contains(queue2));
-
- _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
-
- result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
-
- assertEquals("Expected message to be routed to both queues", 2, result.size());
- assertTrue("Expected queue1 to be routed to", result.contains(queue1));
- assertTrue("Expected queue2 to be routed to", result.contains(queue2));
-
- _exchange.deleteBinding("key",queue2);
-
- result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
-
- assertEquals("Expected message to be routed to both queues", 2, result.size());
- assertTrue("Expected queue1 to be routed to", result.contains(queue1));
- assertTrue("Expected queue2 to be routed to", result.contains(queue2));
-
- result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
-
- assertEquals("Expected message to be routed to queue1 only", 1, result.size());
- assertTrue("Expected queue1 to be routed to", result.contains(queue1));
- assertFalse("Expected queue2 not to be routed to", result.contains(queue2));
-
- _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
-
- result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
- assertEquals("Expected message to be routed to both queues", 2, result.size());
- assertTrue("Expected queue1 to be routed to", result.contains(queue1));
- assertTrue("Expected queue2 to be routed to", result.contains(queue2));
-
-
- }
-
private List<? extends BaseQueue> routeToQueues(final ServerMessage message,
final String routingAddress,
final InstanceProperties instanceProperties)
@@ -301,12 +337,12 @@
return resultQueues;
}
- private ServerMessage mockMessage(boolean val)
+ private ServerMessage mockMessage(boolean propValue)
{
final AMQMessageHeader header = mock(AMQMessageHeader.class);
- when(header.containsHeader("select")).thenReturn(true);
- when(header.getHeader("select")).thenReturn(val);
- when(header.getHeaderNames()).thenReturn(Collections.singleton("select"));
+ when(header.containsHeader("prop")).thenReturn(true);
+ when(header.getHeader("prop")).thenReturn(propValue);
+ when(header.getHeaderNames()).thenReturn(Collections.singleton("prop"));
when(header.containsHeaders(anySet())).then(new Answer<Object>()
{
@Override