blob: 8f84530c6f7560fa5561c8cdccdce4e412b08c75 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v1_0;
import static org.apache.qpid.server.protocol.v1_0.Session_1_0.GLOBAL_CAPABILITY;
import static org.apache.qpid.server.protocol.v1_0.Session_1_0.SHARED_CAPABILITY;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.filter.SelectorParsingException;
import org.apache.qpid.server.filter.selector.ParseException;
import org.apache.qpid.server.filter.selector.TokenMgrError;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.ExactSubjectFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.MatchingSubjectFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class ExchangeSendingDestination extends StandardSendingDestination
{
private static final Accepted ACCEPTED = new Accepted();
private static final Rejected REJECTED = new Rejected();
private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
public static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
private final Exchange<?> _exchange;
private final Symbol[] _capabilities;
private final Map<Symbol, Filter> _filters;
ExchangeSendingDestination(Exchange<?> exchange,
String linkName,
String bindingKey,
String remoteContainerId,
Source source) throws AmqpErrorException
{
this(exchange, bindingKey, source, getMangledSubscriptionName(linkName, remoteContainerId, source));
}
private ExchangeSendingDestination(final Exchange<?> exchange,
final String bindingKey,
final Source source,
final String subscriptionName) throws AmqpErrorException
{
this(exchange, source, subscriptionName, createBindingInfo(exchange, subscriptionName, bindingKey, source));
}
private ExchangeSendingDestination(final Exchange<?> exchange,
final Source source,
final String subscriptionName,
final BindingInfo bindingInfo)
throws AmqpErrorException
{
this(exchange, getQueue(exchange, source, subscriptionName, bindingInfo), bindingInfo, source.getCapabilities());
}
private ExchangeSendingDestination(final Exchange<?> exchange,
final Queue<?> queue,
final BindingInfo bindingInfo,
final Symbol[] capabilities)
{
super(queue);
_exchange = exchange;
_filters = bindingInfo.getActualFilters().isEmpty() ? null : bindingInfo.getActualFilters();
List<Symbol> sourceCapabilities = new ArrayList<>();
if (hasCapability(capabilities, GLOBAL_CAPABILITY))
{
sourceCapabilities.add(GLOBAL_CAPABILITY);
}
if (hasCapability(capabilities, SHARED_CAPABILITY))
{
sourceCapabilities.add(SHARED_CAPABILITY);
}
sourceCapabilities.add(TOPIC_CAPABILITY);
_capabilities = sourceCapabilities.toArray(new Symbol[sourceCapabilities.size()]);
}
private static BindingInfo createBindingInfo(final Exchange<?> exchange,
final String subscriptionName,
final String bindingKey, final Source source)
throws AmqpErrorException
{
return new BindingInfo(exchange, subscriptionName,
bindingKey, source.getFilter());
}
private static String getMangledSubscriptionName(final String linkName,
final String remoteContainerId,
final Source source)
{
boolean isDurable = source.getExpiryPolicy() == TerminusExpiryPolicy.NEVER;
boolean isShared = hasCapability(source.getCapabilities(), SHARED_CAPABILITY);
boolean isGlobal = hasCapability(source.getCapabilities(), GLOBAL_CAPABILITY);
return getMangledSubscriptionName(linkName, isDurable, isShared, isGlobal, remoteContainerId);
}
private static Queue<?> getQueue(Exchange<?> exchange, Source source, String subscriptionName, BindingInfo bindingInfo)
throws AmqpErrorException
{
boolean isDurable = source.getExpiryPolicy() == TerminusExpiryPolicy.NEVER;
boolean isShared = hasCapability(source.getCapabilities(), SHARED_CAPABILITY);
QueueManagingVirtualHost virtualHost;
if (exchange.getAddressSpace() instanceof QueueManagingVirtualHost)
{
virtualHost = (QueueManagingVirtualHost) exchange.getAddressSpace();
}
else
{
throw new AmqpErrorException(new Error(AmqpError.INTERNAL_ERROR,
"Address space of unexpected type"));
}
Queue<?> queue;
final Map<String, Object> attributes = new HashMap<>();
ExclusivityPolicy exclusivityPolicy;
if (isShared)
{
exclusivityPolicy = ExclusivityPolicy.SHARED_SUBSCRIPTION;
}
else
{
exclusivityPolicy = ExclusivityPolicy.LINK;
}
org.apache.qpid.server.model.LifetimePolicy lifetimePolicy = getLifetimePolicy(source.getExpiryPolicy());
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, subscriptionName);
attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
attributes.put(Queue.DURABLE, isDurable);
Map<String, Map<String, Object>> bindings = bindingInfo.getBindings();
try
{
queue = virtualHost.getSubscriptionQueue(exchange.getName(), attributes, bindings);
}
catch (NotFoundException e)
{
throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, e.getMessage()));
}
catch(IllegalStateException e)
{
throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED,
"Subscription is already in use"));
}
return queue;
}
private static boolean hasCapability(final Symbol[] capabilities,
final Symbol expectedCapability)
{
return (capabilities != null && Arrays.asList(capabilities).contains(expectedCapability));
}
private static LifetimePolicy getLifetimePolicy(final TerminusExpiryPolicy expiryPolicy) throws AmqpErrorException
{
LifetimePolicy lifetimePolicy;
if (expiryPolicy == null || expiryPolicy == TerminusExpiryPolicy.SESSION_END)
{
lifetimePolicy = LifetimePolicy.DELETE_ON_SESSION_END;
}
else if (expiryPolicy == TerminusExpiryPolicy.LINK_DETACH)
{
lifetimePolicy = LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
}
else if (expiryPolicy == TerminusExpiryPolicy.CONNECTION_CLOSE)
{
lifetimePolicy = LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
}
else if (expiryPolicy == TerminusExpiryPolicy.NEVER)
{
lifetimePolicy = LifetimePolicy.PERMANENT;
}
else
{
Error error = new Error(AmqpError.NOT_IMPLEMENTED,
String.format("unknown ExpiryPolicy '%s'", expiryPolicy.getValue()));
throw new AmqpErrorException(error);
}
return lifetimePolicy;
}
private static String getMangledSubscriptionName(final String linkName,
final boolean isDurable,
final boolean isShared,
final boolean isGlobal,
String remoteContainerId)
{
if (isGlobal)
{
remoteContainerId = "_global_";
}
else
{
remoteContainerId = sanitizeName(remoteContainerId);
}
String subscriptionName;
if (!isDurable && !isShared)
{
subscriptionName = UUID.randomUUID().toString();
}
else
{
subscriptionName = linkName;
if (isShared)
{
int separator = subscriptionName.indexOf("|");
if (separator > 0)
{
subscriptionName = subscriptionName.substring(0, separator);
}
}
subscriptionName = sanitizeName(subscriptionName);
}
return "qpidsub_/" + remoteContainerId + "_/" + subscriptionName + "_/" + (isDurable
? "durable"
: "nondurable");
}
private static String sanitizeName(String name)
{
return name.replace("_", "__")
.replace(".", "_:")
.replace("(", "_O")
.replace(")", "_C")
.replace("<", "_L")
.replace(">", "_R");
}
@Override
public Outcome[] getOutcomes()
{
return OUTCOMES;
}
public Exchange<?> getExchange()
{
return _exchange;
}
Map<Symbol, Filter> getFilters()
{
return _filters == null ? null : Collections.unmodifiableMap(_filters);
}
@Override
public Symbol[] getCapabilities()
{
return _capabilities;
}
public Queue<?> getQueue()
{
return (Queue<?>) getMessageSource();
}
private static final class BindingInfo
{
private final Map<Symbol, Filter> _actualFilters = new HashMap<>();
private final Map<String, Map<String, Object>> _bindings = new HashMap<>();
BindingInfo(Exchange<?> exchange,
final String queueName,
String bindingKey,
Map<Symbol, Filter> filters) throws AmqpErrorException
{
String binding = null;
final Map<String, Object> arguments = new HashMap<>();
if (filters != null && !filters.isEmpty())
{
boolean hasBindingFilter = false;
boolean hasMessageFilter = false;
for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
{
if(!hasBindingFilter
&& entry.getValue() instanceof ExactSubjectFilter
&& exchange.getType().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
ExactSubjectFilter filter = (ExactSubjectFilter) entry.getValue();
binding = filter.getValue();
_actualFilters.put(entry.getKey(), filter);
hasBindingFilter = true;
}
else if(!hasBindingFilter
&& entry.getValue() instanceof MatchingSubjectFilter
&& exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
MatchingSubjectFilter filter = (MatchingSubjectFilter) entry.getValue();
binding = filter.getValue();
_actualFilters.put(entry.getKey(), filter);
hasBindingFilter = true;
}
else if(entry.getValue() instanceof NoLocalFilter)
{
_actualFilters.put(entry.getKey(), entry.getValue());
arguments.put(AMQPFilterTypes.NO_LOCAL.toString(), true);
}
else if (!hasMessageFilter
&& entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
{
org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter =
(org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
// TODO: QPID-7642 - due to inconsistent handling of invalid filters
// by different exchange implementations
// we need to validate filter before creation of binding
try
{
new org.apache.qpid.server.filter.JMSSelectorFilter(selectorFilter.getValue());
}
catch (ParseException | SelectorParsingException | TokenMgrError e)
{
Error error = new Error();
error.setCondition(AmqpError.INVALID_FIELD);
error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
throw new AmqpErrorException(error);
}
arguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), selectorFilter.getValue());
_actualFilters.put(entry.getKey(), selectorFilter);
hasMessageFilter = true;
}
}
}
if(binding != null)
{
_bindings.put(binding, arguments);
}
if(bindingKey != null)
{
_bindings.put(bindingKey, arguments);
}
if(binding == null
&& bindingKey == null
&& exchange.getType().equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS))
{
_bindings.put(queueName, arguments);
}
else if(binding == null
&& bindingKey == null
&& exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
_bindings.put("#", arguments);
}
}
Map<Symbol, Filter> getActualFilters()
{
return _actualFilters;
}
Map<String, Map<String, Object>> getBindings()
{
return _bindings;
}
@Override
public boolean equals(final Object o)
{
if (this == o)
{
return true;
}
if (o == null || getClass() != o.getClass())
{
return false;
}
final BindingInfo that = (BindingInfo) o;
return _actualFilters.equals(that._actualFilters) && _bindings.equals(that._bindings);
}
@Override
public int hashCode()
{
int result = _actualFilters.hashCode();
result = 31 * result + _bindings.hashCode();
return result;
}
}
}