blob: 60fff56d713501018aae7a50898bcd6b6e2c392f [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.exchange;
import java.security.AccessControlException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.messages.SenderMessages;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSender;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ConfiguredDerivedMethodAttribute;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.DoOnConfigThread;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Param;
import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.queue.CreatingLinkInfo;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.DeleteDeleteTask;
import org.apache.qpid.server.util.FixedKeyMapCreator;
import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownAlternateBindingException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public abstract class AbstractExchange<T extends AbstractExchange<T>>
extends AbstractConfiguredObject<T>
implements Exchange<T>
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractExchange.class);
private static final ThreadLocal<Map<AbstractExchange<?>, Set<String>>> CURRENT_ROUTING = new ThreadLocal<>();
private static final FixedKeyMapCreator BIND_ARGUMENTS_CREATOR =
new FixedKeyMapCreator("bindingKey", "destination", "arguments");
private static final FixedKeyMapCreator UNBIND_ARGUMENTS_CREATOR =
new FixedKeyMapCreator("bindingKey", "destination");
private static final Operation PUBLISH_ACTION = Operation.PERFORM_ACTION("publish");
private final AtomicBoolean _closed = new AtomicBoolean();
@ManagedAttributeField(beforeSet = "preSetAlternateBinding", afterSet = "postSetAlternateBinding" )
private AlternateBinding _alternateBinding;
@ManagedAttributeField
private UnroutableMessageBehaviour _unroutableMessageBehaviour;
@ManagedAttributeField
private CreatingLinkInfo _creatingLinkInfo;
private QueueManagingVirtualHost<?> _virtualHost;
/**
* Whether the exchange is automatically deleted once all queues have detached from it
*/
private boolean _autoDelete;
//The logSubject for ths exchange
private LogSubject _logSubject;
private final Set<DestinationReferrer> _referrers = Collections.newSetFromMap(new ConcurrentHashMap<DestinationReferrer,Boolean>());
private final AtomicLong _receivedMessageCount = new AtomicLong();
private final AtomicLong _receivedMessageSize = new AtomicLong();
private final AtomicLong _routedMessageCount = new AtomicLong();
private final AtomicLong _routedMessageSize = new AtomicLong();
private final AtomicLong _droppedMessageCount = new AtomicLong();
private final AtomicLong _droppedMessageSize = new AtomicLong();
private final List<Binding> _bindings = new CopyOnWriteArrayList<>();
private final ConcurrentMap<MessageSender, Integer> _linkedSenders = new ConcurrentHashMap<>();
private final List<Action<? super Deletable<?>>> _deleteTaskList = new CopyOnWriteArrayList<>();
private volatile MessageDestination _alternateBindingDestination;
public AbstractExchange(Map<String, Object> attributes, QueueManagingVirtualHost<?> vhost)
{
super(vhost, attributes);
Set<String> providedAttributeNames = new HashSet<>(attributes.keySet());
providedAttributeNames.removeAll(getAttributeNames());
if(!providedAttributeNames.isEmpty())
{
throw new IllegalArgumentException("Unknown attributes provided: " + providedAttributeNames);
}
_virtualHost = vhost;
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
}
@Override
public void onValidate()
{
super.onValidate();
if(!isSystemProcess())
{
if (isReservedExchangeName(getName()))
{
throw new ReservedExchangeNameException(getName());
}
}
}
@Override
protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
super.validateChange(proxyForValidation, changedAttributes);
validateOrCreateAlternateBinding(((Exchange<?>) proxyForValidation), false);
if (changedAttributes.contains(ConfiguredObject.DESIRED_STATE) && proxyForValidation.getDesiredState() == State.DELETED)
{
doChecks();
}
}
private boolean isReservedExchangeName(String name)
{
return name == null || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name)
|| name.startsWith("amq.") || name.startsWith("qpid.");
}
@Override
protected void validateOnCreate()
{
super.validateOnCreate();
if (getCreatingLinkInfo() != null && !isSystemProcess())
{
throw new IllegalConfigurationException(String.format("Cannot specify creatingLinkInfo for exchange '%s'", getName()));
}
}
@Override
protected void onCreate()
{
super.onCreate();
validateOrCreateAlternateBinding(this, true);
}
@Override
protected void onOpen()
{
super.onOpen();
final ConfiguredDerivedMethodAttribute<Exchange<?>, Collection<Binding>> durableBindingsAttribute =
(ConfiguredDerivedMethodAttribute<Exchange<?>, Collection<Binding>>) getModel().getTypeRegistry().getAttributeTypes(getTypeClass()).get(DURABLE_BINDINGS);
final Collection<Binding> bindings =
durableBindingsAttribute.convertValue(getActualAttributes().get(DURABLE_BINDINGS), this);
if (bindings != null)
{
_bindings.addAll(bindings);
for (Binding b : _bindings)
{
final MessageDestination messageDestination = getOpenedMessageDestination(b.getDestination());
if (messageDestination != null)
{
Map<String, Object> arguments = b.getArguments() == null ? Collections.emptyMap() : b.getArguments();
try
{
onBind(new BindingIdentifier(b.getBindingKey(), messageDestination), arguments);
}
catch (AMQInvalidArgumentException e)
{
throw new IllegalConfigurationException("Unexpected bind argument : " + e.getMessage(), e);
}
messageDestination.linkAdded(this, b);
}
}
}
if (getLifetimePolicy() == LifetimePolicy.DELETE_ON_CREATING_LINK_CLOSE)
{
if (_creatingLinkInfo != null)
{
final LinkModel link;
if (_creatingLinkInfo.isSendingLink())
{
link = _virtualHost.getSendingLink(_creatingLinkInfo.getRemoteContainerId(), _creatingLinkInfo.getLinkName());
}
else
{
link = _virtualHost.getReceivingLink(_creatingLinkInfo.getRemoteContainerId(), _creatingLinkInfo.getLinkName());
}
addLifetimeConstraint(link);
}
else
{
throw new IllegalArgumentException("Exchanges created with a lifetime policy of "
+ getLifetimePolicy()
+ " must be created from a AMQP 1.0 link.");
}
}
if (getAlternateBinding() != null)
{
String alternateDestination = getAlternateBinding().getDestination();
_alternateBindingDestination = getOpenedMessageDestination(alternateDestination);
if (_alternateBindingDestination != null)
{
_alternateBindingDestination.addReference(this);
}
else
{
LOGGER.warn("Cannot find alternate binding destination '{}' for exchange '{}'", alternateDestination, toString());
}
}
getEventLogger().message(ExchangeMessages.CREATED(getType(), getName(), isDurable()));
}
@Override
public EventLogger getEventLogger()
{
return _virtualHost.getEventLogger();
}
private void performDeleteTasks()
{
for (Action<? super Deletable<?>> task : _deleteTaskList)
{
task.performAction(null);
}
_deleteTaskList.clear();
}
@Override
public boolean isAutoDelete()
{
return getLifetimePolicy() != LifetimePolicy.PERMANENT;
}
private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject)
{
final Action<Deletable> deleteExchangeTask = object -> Subject.doAs(getSubjectWithAddedSystemRights(),
(PrivilegedAction<Void>) () ->
{
AbstractExchange.this.delete();
return null;
});
lifetimeObject.addDeleteTask(deleteExchangeTask);
_deleteTaskList.add(new DeleteDeleteTask(lifetimeObject, deleteExchangeTask));
}
private void performDelete()
{
if(_closed.compareAndSet(false,true))
{
performDeleteTasks();
for(Binding b : _bindings)
{
final MessageDestination messageDestination = getAttainedMessageDestination(b.getDestination());
if(messageDestination != null)
{
messageDestination.linkRemoved(this, b);
}
}
for(MessageSender sender : _linkedSenders.keySet())
{
sender.destinationRemoved(this);
}
if (_alternateBindingDestination != null)
{
_alternateBindingDestination.removeReference(AbstractExchange.this);
}
getEventLogger().message(_logSubject, ExchangeMessages.DELETED());
}
}
private void doChecks()
{
if(hasReferrers())
{
throw new MessageDestinationIsAlternateException(getName());
}
if(isReservedExchangeName(getName()))
{
throw new RequiredExchangeException(getName());
}
}
@Override
@DoOnConfigThread
public void destinationRemoved(@Param(name="destination") final MessageDestination destination)
{
Iterator<Binding> bindingIterator = _bindings.iterator();
while(bindingIterator.hasNext())
{
Binding b = bindingIterator.next();
if(b.getDestination().equals(destination.getName()))
{
final Map<String, Object> bindArguments =
UNBIND_ARGUMENTS_CREATOR.createMap(b.getBindingKey(), destination);
getEventLogger().message(_logSubject, BindingMessages.DELETED(String.valueOf(bindArguments)));
onUnbind(new BindingIdentifier(b.getBindingKey(), destination));
_bindings.remove(b);
}
}
if(!autoDeleteIfNecessary())
{
if (destination.isDurable() && isDurable())
{
final Collection<Binding> durableBindings = getDurableBindings();
attributeSet(DURABLE_BINDINGS, durableBindings, durableBindings);
}
}
}
@Override
public UnroutableMessageBehaviour getUnroutableMessageBehaviour()
{
return _unroutableMessageBehaviour;
}
@Override
public String toString()
{
return getClass().getSimpleName() + "[" + getName() +"]";
}
@Override
public QueueManagingVirtualHost<?> getVirtualHost()
{
return _virtualHost;
}
@Override
public boolean isBound(String bindingKey, Map<String,Object> arguments, Queue<?> queue)
{
if (bindingKey == null)
{
bindingKey = "";
}
for(Binding b : _bindings)
{
if(bindingKey.equals(b.getBindingKey()) && queue.getName().equals(b.getDestination()))
{
return (b.getArguments() == null || b.getArguments().isEmpty())
? (arguments == null || arguments.isEmpty())
: b.getArguments().equals(arguments);
}
}
return false;
}
@Override
public boolean isBound(String bindingKey, Queue<?> queue)
{
if (bindingKey == null)
{
bindingKey = "";
}
for(Binding b : _bindings)
{
if(bindingKey.equals(b.getBindingKey()) && queue.getName().equals(b.getDestination()))
{
return true;
}
}
return false;
}
@Override
public boolean isBound(String bindingKey)
{
if (bindingKey == null)
{
bindingKey = "";
}
for(Binding b : _bindings)
{
if(bindingKey.equals(b.getBindingKey()))
{
return true;
}
}
return false;
}
@Override
public boolean isBound(Queue<?> queue)
{
for(Binding b : _bindings)
{
if(queue.getName().equals(b.getDestination()))
{
return true;
}
}
return false;
}
@Override
public boolean isBound(Map<String, Object> arguments, Queue<?> queue)
{
for(Binding b : _bindings)
{
if(queue.getName().equals(b.getDestination()) &&
((b.getArguments() == null || b.getArguments().isEmpty())
? (arguments == null || arguments.isEmpty())
: b.getArguments().equals(arguments)))
{
return true;
}
}
return false;
}
@Override
public boolean isBound(Map<String, Object> arguments)
{
for(Binding b : _bindings)
{
if(((b.getArguments() == null || b.getArguments().isEmpty())
? (arguments == null || arguments.isEmpty())
: b.getArguments().equals(arguments)))
{
return true;
}
}
return false;
}
@Override
public boolean isBound(String bindingKey, Map<String, Object> arguments)
{
if (bindingKey == null)
{
bindingKey = "";
}
for(Binding b : _bindings)
{
if(b.getBindingKey().equals(bindingKey) &&
((b.getArguments() == null || b.getArguments().isEmpty())
? (arguments == null || arguments.isEmpty())
: b.getArguments().equals(arguments)))
{
return true;
}
}
return false;
}
@Override
public boolean hasBindings()
{
return !_bindings.isEmpty();
}
@Override
public AlternateBinding getAlternateBinding()
{
return _alternateBinding;
}
private void preSetAlternateBinding()
{
if (_alternateBindingDestination != null)
{
_alternateBindingDestination.removeReference(this);
}
}
@SuppressWarnings("unused")
private void postSetAlternateBinding()
{
if(_alternateBinding != null)
{
_alternateBindingDestination = getOpenedMessageDestination(_alternateBinding.getDestination());
if (_alternateBindingDestination != null)
{
_alternateBindingDestination.addReference(this);
}
}
}
@Override
public MessageDestination getAlternateBindingDestination()
{
return _alternateBindingDestination;
}
@Override
public void removeReference(DestinationReferrer destinationReferrer)
{
_referrers.remove(destinationReferrer);
}
@Override
public void addReference(DestinationReferrer destinationReferrer)
{
_referrers.add(destinationReferrer);
}
private boolean hasReferrers()
{
return !_referrers.isEmpty();
}
@Override
public Collection<Binding> getBindings()
{
return Collections.unmodifiableList(_bindings);
}
protected abstract void onBindingUpdated(final BindingIdentifier binding,
final Map<String, Object> newArguments) throws AMQInvalidArgumentException;
protected abstract void onBind(final BindingIdentifier binding, final Map<String, Object> arguments)
throws AMQInvalidArgumentException;
protected abstract void onUnbind(final BindingIdentifier binding);
@Override
public long getBindingCount()
{
return getBindings().size();
}
@Override
public <M extends ServerMessage<? extends StorableMessageMetaData>> RoutingResult<M> route(final M message,
final String routingAddress,
final InstanceProperties instanceProperties)
{
if (_virtualHost.getState() != State.ACTIVE)
{
throw new VirtualHostUnavailableException(this._virtualHost);
}
final RoutingResult<M> routingResult = new RoutingResult<>(message);
Map<AbstractExchange<?>, Set<String>> currentThreadMap = CURRENT_ROUTING.get();
boolean topLevel = currentThreadMap == null;
try
{
if (topLevel)
{
currentThreadMap = new HashMap<>();
CURRENT_ROUTING.set(currentThreadMap);
}
Set<String> existingRoutes = currentThreadMap.get(this);
if (existingRoutes == null)
{
currentThreadMap.put(this, Collections.singleton(routingAddress));
}
else if (existingRoutes.contains(routingAddress))
{
return routingResult;
}
else
{
existingRoutes = new HashSet<>(existingRoutes);
existingRoutes.add(routingAddress);
currentThreadMap.put(this, existingRoutes);
}
_receivedMessageCount.incrementAndGet();
long sizeIncludingHeader = message.getSizeIncludingHeader();
_receivedMessageSize.addAndGet(sizeIncludingHeader);
doRoute(message, routingAddress, instanceProperties, routingResult);
if (!routingResult.hasRoutes())
{
MessageDestination alternateBindingDestination = getAlternateBindingDestination();
if (alternateBindingDestination != null)
{
routingResult.add(alternateBindingDestination.route(message, routingAddress, instanceProperties));
}
}
if (routingResult.hasRoutes())
{
_routedMessageCount.incrementAndGet();
_routedMessageSize.addAndGet(sizeIncludingHeader);
}
else
{
_droppedMessageCount.incrementAndGet();
_droppedMessageSize.addAndGet(sizeIncludingHeader);
}
return routingResult;
}
finally
{
if(topLevel)
{
CURRENT_ROUTING.set(null);
}
}
}
protected abstract <M extends ServerMessage<? extends StorableMessageMetaData>> void doRoute(final M message,
final String routingAddress,
final InstanceProperties instanceProperties,
final RoutingResult<M> result);
@Override
public boolean bind(final String destination,
String bindingKey,
Map<String, Object> arguments,
boolean replaceExistingArguments)
{
try
{
return bindInternal(destination, bindingKey, arguments, replaceExistingArguments);
}
catch (AMQInvalidArgumentException e)
{
throw new IllegalArgumentException("Unexpected bind argument : " + e.getMessage(), e);
}
}
private boolean bindInternal(final String destination,
final String bindingKey,
Map<String, Object> arguments,
final boolean replaceExistingArguments) throws AMQInvalidArgumentException
{
MessageDestination messageDestination = getAttainedMessageDestination(destination);
if (messageDestination == null)
{
throw new IllegalArgumentException(String.format("Destination '%s' is not found.", destination));
}
if(arguments == null)
{
arguments = Collections.emptyMap();
}
Binding newBinding = new BindingImpl(bindingKey, destination, arguments);
Binding previousBinding = null;
for(Binding b : _bindings)
{
if (b.getBindingKey().equals(bindingKey) && b.getDestination().equals(messageDestination.getName()))
{
previousBinding = b;
break;
}
}
if (previousBinding != null && !replaceExistingArguments)
{
return false;
}
final BindingIdentifier bindingIdentifier = new BindingIdentifier(bindingKey, messageDestination);
if(previousBinding != null)
{
onBindingUpdated(bindingIdentifier, arguments);
}
else
{
final Map<String, Object> bindArguments =
BIND_ARGUMENTS_CREATOR.createMap(bindingKey, destination, arguments);
getEventLogger().message(_logSubject, BindingMessages.CREATED(String.valueOf(bindArguments)));
onBind(bindingIdentifier, arguments);
messageDestination.linkAdded(this, newBinding);
}
if (previousBinding != null)
{
_bindings.remove(previousBinding);
}
_bindings.add(newBinding);
if(isDurable() && messageDestination.isDurable())
{
final Collection<Binding> durableBindings = getDurableBindings();
attributeSet(DURABLE_BINDINGS, durableBindings, durableBindings);
}
return true;
}
@Override
public Collection<Binding> getPublishingLinks(MessageDestination destination)
{
List<Binding> bindings = new ArrayList<>();
final String destinationName = destination.getName();
for(Binding b : _bindings)
{
if(b.getDestination().equals(destinationName))
{
bindings.add(b);
}
}
return bindings;
}
@Override
public Collection<Binding> getDurableBindings()
{
List<Binding> durableBindings;
if(isDurable())
{
durableBindings = new ArrayList<>();
for (Binding b : _bindings)
{
MessageDestination destination = getAttainedMessageDestination(b.getDestination());
if(destination != null && destination.isDurable())
{
durableBindings.add(b);
}
}
}
else
{
durableBindings = Collections.emptyList();
}
return durableBindings;
}
@Override
public CreatingLinkInfo getCreatingLinkInfo()
{
return _creatingLinkInfo;
}
private MessageDestination getAttainedMessageDestination(final String name)
{
MessageDestination destination = getVirtualHost().getAttainedQueue(name);
return destination == null ? getVirtualHost().getAttainedMessageDestination(name, false) : destination;
}
private MessageDestination getOpenedMessageDestination(final String name)
{
MessageDestination destination = getVirtualHost().getSystemDestination(name);
if(destination == null)
{
destination = getVirtualHost().getChildByName(Exchange.class, name);
}
if(destination == null)
{
destination = getVirtualHost().getChildByName(Queue.class, name);
}
return destination;
}
@Override
public boolean unbind(@Param(name = "destination", mandatory = true) final String destination,
@Param(name = "bindingKey") String bindingKey)
{
MessageDestination messageDestination = getAttainedMessageDestination(destination);
if (messageDestination != null)
{
Iterator<Binding> bindingIterator = _bindings.iterator();
while (bindingIterator.hasNext())
{
Binding binding = bindingIterator.next();
if (binding.getBindingKey().equals(bindingKey) && binding.getDestination().equals(destination))
{
_bindings.remove(binding);
messageDestination.linkRemoved(this, binding);
onUnbind(new BindingIdentifier(bindingKey, messageDestination));
if (!autoDeleteIfNecessary())
{
if (isDurable() && messageDestination.isDurable())
{
final Collection<Binding> durableBindings = getDurableBindings();
attributeSet(DURABLE_BINDINGS, durableBindings, durableBindings);
}
}
final Map<String, Object> bindArguments =
UNBIND_ARGUMENTS_CREATOR.createMap(bindingKey, destination);
getEventLogger().message(_logSubject, BindingMessages.DELETED(String.valueOf(bindArguments)));
return true;
}
}
}
return false;
}
@Override
public long getMessagesIn()
{
return _receivedMessageCount.get();
}
public long getMsgRoutes()
{
return _routedMessageCount.get();
}
@Override
public long getMessagesDropped()
{
return _droppedMessageCount.get();
}
@Override
public long getBytesIn()
{
return _receivedMessageSize.get();
}
public long getByteRoutes()
{
return _routedMessageSize.get();
}
@Override
public long getBytesDropped()
{
return _droppedMessageSize.get();
}
@Override
public boolean addBinding(String bindingKey, final Queue<?> queue, Map<String, Object> arguments)
throws AMQInvalidArgumentException
{
return bindInternal(queue.getName(), bindingKey, arguments, false);
}
@Override
public void replaceBinding(String bindingKey,
final Queue<?> queue,
Map<String, Object> arguments) throws AMQInvalidArgumentException
{
bindInternal(queue.getName(), bindingKey, arguments, true);
}
private boolean autoDeleteIfNecessary()
{
if (isAutoDeletePending())
{
LOGGER.debug("Auto-deleting exchange: {}", this);
delete();
return true;
}
else
{
return false;
}
}
private boolean isAutoDeletePending()
{
return (getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS )
&& getBindingCount() == 0;
}
@SuppressWarnings("unused")
@StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
return Futures.immediateFuture(null);
}
@Override
protected ListenableFuture<Void> onDelete()
{
if (getState() != State.UNINITIALIZED)
{
performDelete();
}
preSetAlternateBinding();
return super.onDelete();
}
public static final class BindingIdentifier
{
private final String _bindingKey;
private final MessageDestination _destination;
public BindingIdentifier(final String bindingKey, final MessageDestination destination)
{
_bindingKey = bindingKey;
_destination = destination;
}
public String getBindingKey()
{
return _bindingKey;
}
public MessageDestination getDestination()
{
return _destination;
}
@Override
public boolean equals(final Object o)
{
if (this == o)
{
return true;
}
if (o == null || getClass() != o.getClass())
{
return false;
}
final BindingIdentifier that = (BindingIdentifier) o;
return _bindingKey.equals(that._bindingKey) && _destination.equals(that._destination);
}
@Override
public int hashCode()
{
int result = _bindingKey.hashCode();
result = 31 * result + _destination.hashCode();
return result;
}
}
// Used by the protocol layers
@Override
public boolean deleteBinding(final String bindingKey, final Queue<?> queue)
{
return unbind(queue.getName(), bindingKey);
}
@Override
public boolean hasBinding(String bindingKey, final Queue<?> queue)
{
if (bindingKey == null)
{
bindingKey = "";
}
for (Binding b : _bindings)
{
if (b.getBindingKey().equals(bindingKey) && b.getDestination().equals(queue.getName()))
{
return true;
}
}
return false;
}
@Override
public NamedAddressSpace getAddressSpace()
{
return _virtualHost;
}
@Override
public void authorisePublish(final SecurityToken token, final Map<String, Object> arguments)
throws AccessControlException
{
authorise(token, PUBLISH_ACTION, arguments);
}
@Override
protected void logOperation(final String operation)
{
getEventLogger().message(ExchangeMessages.OPERATION(operation));
}
@Override
public void linkAdded(final MessageSender sender, final PublishingLink link)
{
Integer oldValue = _linkedSenders.putIfAbsent(sender, 1);
if(oldValue != null)
{
_linkedSenders.put(sender, oldValue+1);
}
if( link.TYPE_LINK.equals(link.getType()))
{
getEventLogger().message(SenderMessages.CREATE(link.getName(), link.getDestination()));
}
}
@Override
public void linkRemoved(final MessageSender sender, final PublishingLink link)
{
int oldValue = _linkedSenders.remove(sender);
if(oldValue != 1)
{
_linkedSenders.put(sender, oldValue-1);
}
if( link.TYPE_LINK.equals(link.getType()))
{
getEventLogger().message(SenderMessages.CLOSE(link.getName(), link.getDestination()));
}
}
private void validateOrCreateAlternateBinding(final Exchange<?> exchange, final boolean mayCreate)
{
Object value = exchange.getAttribute(ALTERNATE_BINDING);
if (value instanceof AlternateBinding)
{
AlternateBinding alternateBinding = (AlternateBinding) value;
String destinationName = alternateBinding.getDestination();
MessageDestination messageDestination =
_virtualHost.getAttainedMessageDestination(destinationName, mayCreate);
if (messageDestination == null)
{
throw new UnknownAlternateBindingException(destinationName);
}
else if (messageDestination == this)
{
throw new IllegalConfigurationException(String.format(
"Cannot create alternate binding for '%s' : Alternate binding destination cannot refer to self.",
getName()));
}
else if (isDurable() && !messageDestination.isDurable())
{
throw new IllegalConfigurationException(String.format(
"Cannot create alternate binding for '%s' : Alternate binding destination '%s' is not durable.",
getName(),
destinationName));
}
}
}
}