QPID-6028 : [Java Broker] PublishingLink is a generalisation of Binding
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index fb4e717..8964bfe 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -62,6 +62,7 @@
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Param;
+import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
@@ -172,7 +173,7 @@
if (messageDestination != null)
{
onBind(new BindingIdentifier(b.getBindingKey(), messageDestination), b.getArguments());
- messageDestination.linkAdded(this, b.getBindingKey());
+ messageDestination.linkAdded(this, b);
}
}
}
@@ -211,7 +212,7 @@
final MessageDestination messageDestination = getAttainedMessageDestination(b.getDestination());
if(messageDestination != null)
{
- messageDestination.linkRemoved(this, b.getBindingKey());
+ messageDestination.linkRemoved(this, b);
}
}
for(MessageSender sender : _linkedSenders.keySet())
@@ -646,13 +647,13 @@
getEventLogger().message(_logSubject, BindingMessages.CREATED(String.valueOf(bindArguments)));
onBind(bindingIdentifier, arguments);
- messageDestination.linkAdded(this, bindingKey);
+ messageDestination.linkAdded(this, newBinding);
}
return true;
}
@Override
- public Collection<Binding> getBindingsForDestination(MessageDestination destination)
+ public Collection<Binding> getPublishingLinks(MessageDestination destination)
{
List<Binding> bindings = new ArrayList<>();
final String destinationName = destination.getName();
@@ -727,7 +728,7 @@
if(binding.getBindingKey().equals(bindingKey) && binding.getDestination().equals(destination))
{
_bindings.remove(binding);
- messageDestination.linkRemoved(this, bindingKey);
+ messageDestination.linkRemoved(this, binding);
onUnbind(new BindingIdentifier(bindingKey, messageDestination));
if(!autoDeleteIfNecessary())
{
@@ -946,7 +947,7 @@
}
@Override
- public void linkAdded(final MessageSender sender, final String linkName)
+ public void linkAdded(final MessageSender sender, final PublishingLink link)
{
Integer oldValue = _linkedSenders.putIfAbsent(sender, 1);
if(oldValue != null)
@@ -956,7 +957,7 @@
}
@Override
- public void linkRemoved(final MessageSender sender, final String linkName)
+ public void linkRemoved(final MessageSender sender, final PublishingLink link)
{
int oldValue = _linkedSenders.remove(sender);
if(oldValue != 1)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
index 9855d9b..89afbce 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
@@ -31,6 +31,7 @@
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.PermissionedObject;
+import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.security.AccessControl;
import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.SecurityToken;
@@ -139,13 +140,13 @@
}
@Override
- public void linkAdded(final MessageSender sender, final String linkName)
+ public void linkAdded(final MessageSender sender, final PublishingLink link)
{
}
@Override
- public void linkRemoved(final MessageSender sender, final String linkName)
+ public void linkRemoved(final MessageSender sender, final PublishingLink link)
{
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
index 107bed0..8fc5cc0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
@@ -24,6 +24,7 @@
import java.util.Map;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -57,6 +58,6 @@
boolean isDurable();
- void linkAdded(MessageSender sender, String linkName);
- void linkRemoved(MessageSender sender, String linkName);
+ void linkAdded(MessageSender sender, PublishingLink link);
+ void linkRemoved(MessageSender sender, PublishingLink link);
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java
index 6197571..ef867e9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageSender.java
@@ -20,7 +20,13 @@
*/
package org.apache.qpid.server.message;
+import java.util.Collection;
+
+import org.apache.qpid.server.model.PublishingLink;
+
public interface MessageSender
{
void destinationRemoved(MessageDestination destination);
+
+ Collection<? extends PublishingLink> getPublishingLinks(MessageDestination destination);
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java b/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
index 5e6e880..c13a94d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
@@ -23,9 +23,8 @@
import java.util.Map;
@ManagedAttributeValueType
-public interface Binding extends ManagedAttributeValue
+public interface Binding extends PublishingLink
{
String getBindingKey();
- String getDestination();
Map<String,Object> getArguments();
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index 84a18da..ba4fae9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -59,7 +59,8 @@
@DerivedAttribute
Collection<Binding> getBindings();
- Collection<Binding> getBindingsForDestination(MessageDestination destination);
+ @Override
+ Collection<Binding> getPublishingLinks(MessageDestination destination);
@DerivedAttribute(persist = true)
Collection<Binding> getDurableBindings();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/PublishingLink.java b/broker-core/src/main/java/org/apache/qpid/server/model/PublishingLink.java
new file mode 100644
index 0000000..8629ca6
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/PublishingLink.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+@ManagedAttributeValueType
+public interface PublishingLink extends ManagedAttributeValue
+{
+ String getDestination();
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 6e05558..db0936b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -259,7 +259,7 @@
boolean isHoldOnPublishEnabled();
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
- Collection<Binding> getBindings();
+ Collection<PublishingLink> getPublishingLinks();
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 1bec176..fb37b56 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1034,15 +1034,15 @@
}
- public Collection<Binding> getBindings()
+ public Collection<PublishingLink> getPublishingLinks()
{
- List<Binding> bindings = new ArrayList<>();
+ List<PublishingLink> links = new ArrayList<>();
for(MessageSender sender : _linkedSenders.keySet())
{
- //TODO - eliminate cast
- bindings.addAll(((Exchange)sender).getBindingsForDestination(this));
+ final Collection<? extends PublishingLink> linksForDestination = sender.getPublishingLinks(this);
+ links.addAll(linksForDestination);
}
- return bindings;
+ return links;
}
@Override
@@ -3397,7 +3397,7 @@
}
@Override
- public void linkAdded(final MessageSender sender, final String linkName)
+ public void linkAdded(final MessageSender sender, final PublishingLink link)
{
Integer oldValue = _linkedSenders.putIfAbsent(sender, 1);
@@ -3409,7 +3409,7 @@
}
@Override
- public void linkRemoved(final MessageSender sender, final String linkName)
+ public void linkRemoved(final MessageSender sender, final PublishingLink link)
{
int oldValue = _linkedSenders.remove(sender);
if(oldValue != 1)
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 7830d43..dede752 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -153,8 +153,8 @@
assertTrue("Queue was not bound to key",
_exchange.isBound(_routingKey,_queue));
assertEquals("Exchange binding count", 1,
- _queue.getBindings().size());
- final Binding firstBinding = _queue.getBindings().iterator().next();
+ _queue.getPublishingLinks().size());
+ final Binding firstBinding = (Binding) _queue.getPublishingLinks().iterator().next();
assertEquals("Wrong binding key", _routingKey,
firstBinding.getBindingKey());
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
index 9285d28..8e4e609 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
@@ -50,6 +50,7 @@
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.SystemAddressSpaceCreator;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -377,13 +378,13 @@
}
@Override
- public void linkAdded(final MessageSender sender, final String linkName)
+ public void linkAdded(final MessageSender sender, final PublishingLink link)
{
}
@Override
- public void linkRemoved(final MessageSender sender, final String linkName)
+ public void linkRemoved(final MessageSender sender, final PublishingLink link)
{
}
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index e27c32a..311dff5 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -77,6 +77,7 @@
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.OperationParameter;
+import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
@@ -382,13 +383,13 @@
}
@Override
- public void linkAdded(final MessageSender sender, final String linkName)
+ public void linkAdded(final MessageSender sender, final PublishingLink link)
{
}
@Override
- public void linkRemoved(final MessageSender sender, final String linkName)
+ public void linkRemoved(final MessageSender sender, final PublishingLink link)
{
}
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index a6e0a39..174f1c6 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -35,6 +35,7 @@
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.security.SecurityToken;
@@ -158,13 +159,13 @@
}
@Override
- public void linkAdded(final MessageSender sender, final String linkName)
+ public void linkAdded(final MessageSender sender, final PublishingLink link)
{
}
@Override
- public void linkRemoved(final MessageSender sender, final String linkName)
+ public void linkRemoved(final MessageSender sender, final PublishingLink link)
{
}
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
index 99eea15..2f43170 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
@@ -50,6 +50,7 @@
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.MessageDurability;
@@ -109,13 +110,13 @@
}
@Override
- public void linkAdded(final MessageSender sender, final String linkName)
+ public void linkAdded(final MessageSender sender, final PublishingLink link)
{
}
@Override
- public void linkRemoved(final MessageSender sender, final String linkName)
+ public void linkRemoved(final MessageSender sender, final PublishingLink link)
{
}
diff --git a/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index 773d971..c883a11 100644
--- a/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -441,13 +441,13 @@
bindQueueToExchange(exch, directRouting, _virtualHost.getChildByName(Queue.class, durableQueueName), false);
assertEquals("Incorrect number of bindings registered before recovery",
- 1, _virtualHost.getChildByName(Queue.class, durableQueueName).getBindings().size());
+ 1, _virtualHost.getChildByName(Queue.class, durableQueueName).getPublishingLinks().size());
//verify binding is actually normally recovered
reloadVirtualHost();
assertEquals("Incorrect number of bindings registered after first recovery",
- 1, _virtualHost.getChildByName(Queue.class, durableQueueName).getBindings().size());
+ 1, _virtualHost.getChildByName(Queue.class, durableQueueName).getPublishingLinks().size());
exch = (Exchange<?>) _virtualHost.getChildByName(Exchange.class, directExchangeName);
assertNotNull("Exchange was not recovered", exch);
@@ -459,7 +459,7 @@
reloadVirtualHost();
assertEquals("Incorrect number of bindings registered after second recovery",
- 0, _virtualHost.getChildByName(Queue.class, durableQueueName).getBindings().size());
+ 0, _virtualHost.getChildByName(Queue.class, durableQueueName).getPublishingLinks().size());
}
/**
@@ -499,11 +499,11 @@
assertEquals("Incorrect number of (durable) queues following recovery", 6, _virtualHost.getChildren(Queue.class).size());
- validateBindingProperties(_virtualHost.getChildByName(Queue.class, durablePriorityQueueName).getBindings(), false);
- validateBindingProperties(_virtualHost.getChildByName(Queue.class, durablePriorityTopicQueueName).getBindings(), true);
- validateBindingProperties(_virtualHost.getChildByName(Queue.class, durableQueueName).getBindings(), false);
- validateBindingProperties(_virtualHost.getChildByName(Queue.class, durableTopicQueueName).getBindings(), true);
- validateBindingProperties(_virtualHost.getChildByName(Queue.class, durableExclusiveQueueName).getBindings(), false);
+ validateBindingProperties(_virtualHost.getChildByName(Queue.class, durablePriorityQueueName).getPublishingLinks(), false);
+ validateBindingProperties(_virtualHost.getChildByName(Queue.class, durablePriorityTopicQueueName).getPublishingLinks(), true);
+ validateBindingProperties(_virtualHost.getChildByName(Queue.class, durableQueueName).getPublishingLinks(), false);
+ validateBindingProperties(_virtualHost.getChildByName(Queue.class, durableTopicQueueName).getPublishingLinks(), true);
+ validateBindingProperties(_virtualHost.getChildByName(Queue.class, durableExclusiveQueueName).getPublishingLinks(), false);
}
/**