blob: d89b637bf051b63a5354c24bdb8119bf2ea5537a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.SelectorParsingException;
import org.apache.qpid.server.filter.selector.ParseException;
import org.apache.qpid.server.filter.selector.TokenMgrError;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class);
public static final Symbol PRIORITY = Symbol.valueOf("priority");
private UnsignedInteger _lastDeliveryId;
private Binary _lastDeliveryTag;
private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<>();
private Map<Binary, MessageInstance> _unsettledMap2 = new HashMap<>();
private Binary _transactionId;
private Integer _priority;
private final List<Binary> _resumeAcceptedTransfers = new ArrayList<>();
private final List<MessageInstance> _resumeFullTransfers = new ArrayList<>();
private volatile boolean _draining = false;
private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap = new ConcurrentHashMap<>();
private SendingDestination _destination;
private EnumSet<ConsumerOption> _consumerOptions;
private FilterManager _consumerFilters;
private ConsumerTarget_1_0 _consumerTarget;
private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
public SendingLinkEndpoint(final Session_1_0 session, final LinkImpl<Source, Target> link)
{
super(session, link);
setDeliveryCount(UnsignedInteger.valueOf(0));
setAvailable(UnsignedInteger.valueOf(0));
setCapabilities(Arrays.asList(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS));
}
@Override
public void start()
{
}
public void prepareConsumerOptionsAndFilters(final SendingDestination destination) throws AmqpErrorException
{
// TODO FIXME: this method might modify the source. this is not good encapsulation. furthermore if it does so then it should inform the link/linkregistry about it!
_destination = destination;
final Source source = getSource();
EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
boolean noLocal = false;
JMSSelectorFilter messageFilter = null;
if(destination instanceof ExchangeDestination)
{
options.add(ConsumerOption.ACQUIRES);
options.add(ConsumerOption.SEES_REQUEUES);
}
else if(destination instanceof MessageSourceDestination)
{
MessageSource messageSource = _destination.getMessageSource();
if(messageSource instanceof Queue && ((Queue<?>)messageSource).getAvailableAttributes().contains("topic"))
{
source.setDistributionMode(StdDistMode.COPY);
}
Map<Symbol,Filter> filters = source.getFilter();
Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
if(filters != null)
{
for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
{
if(entry.getValue() instanceof NoLocalFilter)
{
actualFilters.put(entry.getKey(), entry.getValue());
noLocal = true;
}
else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
{
org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
try
{
messageFilter = new JMSSelectorFilter(selectorFilter.getValue());
actualFilters.put(entry.getKey(), entry.getValue());
}
catch (ParseException | SelectorParsingException | TokenMgrError e)
{
Error error = new Error();
error.setCondition(AmqpError.INVALID_FIELD);
error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
throw new AmqpErrorException(error);
}
}
}
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
if(source.getDistributionMode() != StdDistMode.COPY)
{
options.add(ConsumerOption.ACQUIRES);
options.add(ConsumerOption.SEES_REQUEUES);
}
}
else
{
throw new ConnectionScopedRuntimeException("Unknown destination type");
}
if(noLocal)
{
options.add(ConsumerOption.NO_LOCAL);
}
FilterManager filters = null;
if(messageFilter != null)
{
filters = new FilterManager();
filters.add(messageFilter.getName(), messageFilter);
}
_consumerOptions = options;
_consumerFilters = filters;
}
void createConsumerTarget() throws AmqpErrorException
{
final Source source = getSource();
_consumerTarget = new ConsumerTarget_1_0(this,
_destination instanceof ExchangeDestination ? true : source.getDistributionMode() != StdDistMode.COPY);
try
{
final String name = getTarget().getAddress() == null ? getLinkName() : getTarget().getAddress();
_consumer = _destination.getMessageSource()
.addConsumer(_consumerTarget,
_consumerFilters,
Message_1_0.class,
name,
_consumerOptions,
getPriority());
_consumerTarget.updateNotifyWorkDesired();
}
catch (MessageSource.ExistingExclusiveConsumer e)
{
String msg = "Cannot add a consumer to the destination as there is already an exclusive consumer";
throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
}
catch (MessageSource.ExistingConsumerPreventsExclusive e)
{
String msg = "Cannot add an exclusive consumer to the destination as there is already a consumer";
throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
}
catch (MessageSource.ConsumerAccessRefused e)
{
String msg = "Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy";
throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
}
catch (MessageSource.QueueDeleted e)
{
String msg = "Cannot add a consumer to the destination as the destination has been deleted";
throw new AmqpErrorException(new Error(AmqpError.RESOURCE_DELETED, msg), e);
}
}
@Override
protected Map<Symbol, Object> initProperties(final Attach attach)
{
Map<Symbol, Object> peerProperties = attach.getProperties();
if(peerProperties != null)
{
Map<Symbol, Object> actualProperties = new HashMap<>();
if(peerProperties.containsKey(PRIORITY))
{
Object value = peerProperties.get(PRIORITY);
if(value instanceof Number)
{
_priority = ((Number)value).intValue();
}
else if(value instanceof String)
{
try
{
_priority = Integer.parseInt(value.toString());
}
catch (NumberFormatException e)
{
}
}
if(_priority != null)
{
actualProperties.put(PRIORITY, _priority);
}
}
return actualProperties;
}
else
{
return Collections.emptyMap();
}
}
@Override
protected void reattachLink(final Attach attach) throws AmqpErrorException
{
if (getSource() == null)
{
throw new IllegalStateException("Terminus should be set when resuming a Link.");
}
if (attach.getSource() == null)
{
throw new IllegalStateException("Attach.getSource should not be null when resuming a Link. That would be recovering the Link.");
}
Source newSource = (Source) attach.getSource();
Source oldSource = getSource();
final SendingDestination destination = getSession().getSendingDestination(getLinkName(), oldSource);
prepareConsumerOptionsAndFilters(destination);
if (getDestination() instanceof ExchangeDestination && !Boolean.TRUE.equals(newSource.getDynamic()))
{
final SendingDestination newDestination =
getSession().getSendingDestination(getLinkName(), newSource);
if (getSession().updateSourceForSubscription(this, newSource, newDestination))
{
setDestination(newDestination);
}
}
attachReceived(attach);
}
@Override
protected void resumeLink(final Attach attach) throws AmqpErrorException
{
if (getSource() == null)
{
throw new IllegalStateException("Terminus should be set when resuming a Link.");
}
if (attach.getSource() == null)
{
throw new IllegalStateException("Attach.getSource should not be null when resuming a Link. That would be recovering the Link.");
}
Source newSource = (Source) attach.getSource();
Source oldSource = getSource();
final SendingDestination destination = getSession().getSendingDestination(getLinkName(), oldSource);
prepareConsumerOptionsAndFilters(destination);
if (getDestination() instanceof ExchangeDestination && !Boolean.TRUE.equals(newSource.getDynamic()))
{
final SendingDestination newDestination =
getSession().getSendingDestination(getLinkName(), newSource);
if (getSession().updateSourceForSubscription(this, newSource, newDestination))
{
setDestination(newDestination);
}
}
attachReceived(attach);
initialiseUnsettled();
}
@Override
protected void establishLink(final Attach attach) throws AmqpErrorException
{
if (getSource() != null || getTarget() != null)
{
throw new IllegalStateException("LinkEndpoint and Termini should be null when establishing a Link.");
}
attachReceived(attach);
}
@Override
protected void recoverLink(final Attach attach) throws AmqpErrorException
{
if (getSource() == null)
{
throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, ""));
}
final SendingDestination destination = getSession().getSendingDestination(getLinkName(), getSource());
prepareConsumerOptionsAndFilters(destination);
attachReceived(attach);
}
@Override
public Role getRole()
{
return Role.SENDER;
}
public Integer getPriority()
{
return _priority;
}
public TerminusDurability getTerminusDurability()
{
return getSource().getDurable();
}
public boolean transfer(final Transfer xfr, final boolean decrementCredit)
{
Session_1_0 s = getSession();
xfr.setMessageFormat(UnsignedInteger.ZERO);
if(decrementCredit)
{
setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
}
setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + 1)));
xfr.setHandle(getLocalHandle());
s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag));
if(!Boolean.TRUE.equals(xfr.getSettled()))
{
_unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId());
}
if(Boolean.TRUE.equals(xfr.getMore()))
{
_lastDeliveryTag = xfr.getDeliveryTag();
}
else
{
_lastDeliveryTag = null;
}
return true;
}
public boolean drained()
{
if (_draining)
{
setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
setLinkCredit(UnsignedInteger.ZERO);
sendFlow();
_draining = false;
return true;
}
else
{
return false;
}
}
@Override
public void receiveFlow(final Flow flow)
{
UnsignedInteger t = flow.getDeliveryCount();
UnsignedInteger c = flow.getLinkCredit();
setDrain(flow.getDrain());
Map options;
if((options = flow.getProperties()) != null)
{
_transactionId = (Binary) options.get(Symbol.valueOf("txn-id"));
}
if(t == null)
{
setLinkCredit(c);
}
else
{
UnsignedInteger limit = t.add(c);
if(limit.compareTo(getDeliveryCount())<=0)
{
setLinkCredit(UnsignedInteger.valueOf(0));
}
else
{
setLinkCredit(limit.subtract(getDeliveryCount()));
}
}
flowStateChanged();
}
@Override
public void flowStateChanged()
{
if(Boolean.TRUE.equals(getDrain()) && getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0)
{
_draining = true;
getConsumerTarget().flush();
}
while(!_resumeAcceptedTransfers.isEmpty() && hasCreditToSend())
{
Accepted accepted = new Accepted();
Transfer xfr = new Transfer();
Binary dt = _resumeAcceptedTransfers.remove(0);
xfr.setDeliveryTag(dt);
xfr.setState(accepted);
xfr.setResume(Boolean.TRUE);
transfer(xfr, true);
xfr.dispose();
}
if(_resumeAcceptedTransfers.isEmpty())
{
getConsumerTarget().flowStateChanged();
}
}
@Override
protected void remoteDetachedPerformDetach(final Detach detach)
{
getConsumerTarget().close();
TerminusExpiryPolicy expiryPolicy = (getSource()).getExpiryPolicy();
if (Boolean.TRUE.equals(detach.getClosed())
|| TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
|| (TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy) && getSession().isClosing())
|| (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
{
Modified state = new Modified();
state.setDeliveryFailed(true);
for (UnsettledAction action : _unsettledActionMap.values())
{
action.process(state, Boolean.TRUE);
}
_unsettledActionMap.clear();
Error closingError = null;
if (getDestination() instanceof ExchangeDestination
&& getSession().getConnection().getAddressSpace() instanceof QueueManagingVirtualHost)
{
try
{
((QueueManagingVirtualHost) getSession().getConnection().getAddressSpace()).removeSubscriptionQueue(
((ExchangeDestination) getDestination()).getQueue().getName());
}
catch (AccessControlException e)
{
LOGGER.error("Error unregistering subscription", e);
closingError = new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription");
}
catch (IllegalStateException e)
{
closingError = new Error(AmqpError.RESOURCE_LOCKED, e.getMessage());
}
catch (NotFoundException e)
{
closingError = new Error(AmqpError.NOT_FOUND, e.getMessage());
}
}
close(closingError);
}
else if (detach.getError() != null)
{
detach();
destroy();
getConsumerTarget().updateNotifyWorkDesired();
}
else
{
detach();
getConsumerTarget().updateNotifyWorkDesired();
}
}
public void addUnsettled(final Binary tag, final UnsettledAction unsettledAction, final MessageInstance queueEntry)
{
_unsettledActionMap.put(tag, unsettledAction);
if(getTransactionId() == null)
{
_unsettledMap2.put(tag, queueEntry);
}
}
@Override
protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
{
UnsettledAction action = _unsettledActionMap.get(deliveryTag);
boolean localSettle = false;
if(action != null)
{
localSettle = action.process(state, settled);
if(localSettle && !Boolean.TRUE.equals(settled))
{
updateDisposition(deliveryTag, state, true);
}
}
if(Boolean.TRUE.equals(settled) || localSettle)
{
_unsettledActionMap.remove(deliveryTag);
_unsettledMap.remove(deliveryTag);
_unsettledMap2.remove(deliveryTag);
}
}
public ServerTransaction getTransaction(Binary transactionId)
{
Session_1_0 session = getSession();
return session == null ? null : session.getTransaction(transactionId);
}
public boolean hasCreditToSend()
{
UnsignedInteger linkCredit = getLinkCredit();
return linkCredit != null && (linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0)
&& getSession().hasCreditToSend();
}
public UnsignedInteger getLastDeliveryId()
{
return _lastDeliveryId;
}
public void setLastDeliveryId(final UnsignedInteger deliveryId)
{
_lastDeliveryId = deliveryId;
}
public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
{
UnsignedInteger deliveryId;
if (settled && (deliveryId = _unsettledMap.remove(deliveryTag)) != null)
{
_unsettledMap2.remove(deliveryTag);
getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
}
}
public Binary getTransactionId()
{
return _transactionId;
}
@Override
public void attachReceived(final Attach attach) throws AmqpErrorException
{
super.attachReceived(attach);
Target target = (Target) attach.getTarget();
Source source = getSource();
if (source == null)
{
source = new Source();
Source attachSource = (Source) attach.getSource();
source.setAddress(attachSource.getAddress());
source.setDynamic(attachSource.getDynamic());
source.setDurable(TerminusDurability.min(attachSource.getDurable(),
getLink().getHighestSupportedTerminusDurability()));
source.setExpiryPolicy(attachSource.getExpiryPolicy());
source.setDistributionMode(attachSource.getDistributionMode());
source.setFilter(attachSource.getFilter());
source.setCapabilities(attachSource.getCapabilities());
final SendingDestination destination = getSession().getSendingDestination(attach.getName(), source);
source.setCapabilities(destination.getCapabilities());
if (destination instanceof ExchangeDestination)
{
ExchangeDestination exchangeDestination = (ExchangeDestination) destination;
exchangeDestination.getQueue()
.setAttributes(Collections.<String, Object>singletonMap(Queue.DESIRED_STATE,
org.apache.qpid.server.model.State.ACTIVE));
}
getLink().setSource(source);
prepareConsumerOptionsAndFilters(destination);
}
getLink().setTarget(target);
final MessageInstanceConsumer consumer = getConsumer();
createConsumerTarget();
_resumeAcceptedTransfers.clear();
_resumeFullTransfers.clear();
final NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
Map<Binary, MessageInstance> unsettledCopy = new HashMap<>(_unsettledMap2);
Map initialUnsettledMap = getInitialUnsettledMap();
for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
{
Binary deliveryTag = entry.getKey();
final MessageInstance queueEntry = entry.getValue();
if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
{
queueEntry.setRedelivered();
queueEntry.release(consumer);
_unsettledMap2.remove(deliveryTag);
}
else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
{
Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
if (outcome instanceof Accepted)
{
AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
if (consumer.acquires())
{
if (queueEntry.acquire() || queueEntry.isAcquired())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
{
public void postCommit()
{
queueEntry.delete();
}
public void onRollback()
{
}
});
}
}
}
else if (outcome instanceof Released)
{
AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
if (consumer.acquires())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
{
public void postCommit()
{
queueEntry.release(consumer);
}
public void onRollback()
{
}
});
}
}
//_unsettledMap.remove(deliveryTag);
initialUnsettledMap.remove(deliveryTag);
_resumeAcceptedTransfers.add(deliveryTag);
}
else
{
_resumeFullTransfers.add(queueEntry);
// exists in receivers map, but not yet got an outcome ... should resend with resume = true
}
}
getConsumerTarget().updateNotifyWorkDesired();
}
@Override
public void initialiseUnsettled()
{
Map<Binary, MessageInstance> _localUnsettled = new HashMap<>(_unsettledMap2);
for (Map.Entry<Binary, MessageInstance> entry : _localUnsettled.entrySet())
{
entry.setValue(null);
}
}
public MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer()
{
return _consumer;
}
public ConsumerTarget_1_0 getConsumerTarget()
{
return _consumerTarget;
}
public SendingDestination getDestination()
{
return _destination;
}
public void setDestination(final SendingDestination destination)
{
_destination = destination;
}
}