blob: d3abefc8dabbd88c76946e602ccdf0fc2dfe6b0b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlContext;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.DestinationAddress;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistryImpl;
import org.apache.qpid.server.protocol.v1_0.delivery.UnsettledDelivery;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.LifetimePolicy;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinks;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinksOrMessages;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Terminus;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.queue.CreatingLinkInfo;
import org.apache.qpid.server.queue.CreatingLinkInfoImpl;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.session.AbstractAMQPSession;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget_1_0>
implements LogSubject, org.apache.qpid.server.util.Deletable<Session_1_0>
{
static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
static final Symbol SHARED_CAPABILITY = Symbol.getSymbol("shared");
static final Symbol GLOBAL_CAPABILITY = Symbol.getSymbol("global");
private static final Logger LOGGER = LoggerFactory.getLogger(Session_1_0.class);
public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
private static final EnumSet<SessionState> END_STATES =
EnumSet.of(SessionState.END_RECVD, SessionState.END_PIPE, SessionState.END_SENT, SessionState.ENDED);
private final AMQPConnection_1_0<?> _connection;
private final AtomicBoolean _closed = new AtomicBoolean();
private SessionState _sessionState;
private final Map<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>, UnsignedInteger> _endpointToOutputHandle = new HashMap<>();
private final Map<UnsignedInteger, LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> _inputHandleToEndpoint = new HashMap<>();
private final Set<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> _associatedLinkEndpoints = new HashSet<>();
private final int _sendingChannel;
private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
private int _nextOutgoingDeliveryId;
private final UnsignedInteger _initialOutgoingId = UnsignedInteger.ZERO;
private SequenceNumber _nextIncomingId;
private final UnsignedInteger _incomingWindow;
private final SequenceNumber _nextOutgoingId = new SequenceNumber(_initialOutgoingId.intValue());
private final UnsignedInteger _outgoingWindow = UnsignedInteger.valueOf(DEFAULT_SESSION_BUFFER_SIZE);
private volatile long _remoteIncomingWindow;
private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
private UnsignedInteger _lastSentIncomingLimit;
private final DeliveryRegistry _outgoingDeliveryRegistry = new DeliveryRegistryImpl();
private final DeliveryRegistry _incomingDeliveryRegistry = new DeliveryRegistryImpl();
private final Error _sessionEndedLinkError =
new Error(LinkError.DETACH_FORCED,
"Force detach the link because the session is remotely ended.");
private final String _primaryDomain;
private final Set<Object> _blockingEntities = Collections.newSetFromMap(new ConcurrentHashMap<>());
public Session_1_0(final AMQPConnection_1_0 connection,
Begin begin,
int sendingChannelId,
int receivingChannelId,
long incomingWindow)
{
super(connection, sendingChannelId);
_sendingChannel = sendingChannelId;
_sessionState = SessionState.ACTIVE;
_nextIncomingId = new SequenceNumber(begin.getNextOutgoingId().intValue());
_connection = connection;
_primaryDomain = getPrimaryDomain();
_incomingWindow = UnsignedInteger.valueOf(incomingWindow);
AccessController.doPrivileged((new PrivilegedAction<Object>()
{
@Override
public Object run()
{
_connection.getEventLogger().message(ChannelMessages.CREATE());
return null;
}
}), _accessControllerContext);
}
public void sendDetach(final Detach detach)
{
send(detach);
}
public void receiveAttach(final Attach attach)
{
receivedComplete();
if(_sessionState == SessionState.ACTIVE)
{
UnsignedInteger inputHandle = attach.getHandle();
if (_inputHandleToEndpoint.containsKey(inputHandle))
{
String errorMessage = String.format("Input Handle '%d' already in use", inputHandle.intValue());
getConnection().close(new Error(SessionError.HANDLE_IN_USE, errorMessage));
throw new ConnectionScopedRuntimeException(errorMessage);
}
else
{
final Link_1_0<? extends BaseSource, ? extends BaseTarget> link;
if (attach.getRole() == Role.RECEIVER)
{
link = getAddressSpace().getSendingLink(getConnection().getRemoteContainerId(), attach.getName());
}
else
{
link = getAddressSpace().getReceivingLink(getConnection().getRemoteContainerId(), attach.getName());
}
final ListenableFuture<? extends LinkEndpoint<?,?>> future = link.attach(this, attach);
addFutureCallback(future, new EndpointCreationCallback(attach), MoreExecutors.directExecutor());
}
}
}
private void updateDisposition(final Role role,
final UnsignedInteger first,
final UnsignedInteger last,
final DeliveryState state, final boolean settled)
{
Disposition disposition = new Disposition();
disposition.setRole(role);
disposition.setFirst(first);
disposition.setLast(last);
disposition.setSettled(settled);
disposition.setState(state);
if (settled)
{
final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
SequenceNumber pos = new SequenceNumber(first.intValue());
SequenceNumber end = new SequenceNumber(last.intValue());
while (pos.compareTo(end) <= 0)
{
deliveryRegistry.removeDelivery(UnsignedInteger.valueOf(pos.intValue()));
pos.incr();
}
}
send(disposition);
}
void updateDisposition(final LinkEndpoint<?,?> linkEndpoint,
final Binary deliveryTag,
final DeliveryState state,
final boolean settled)
{
final UnsignedInteger deliveryId = getDeliveryId(deliveryTag, linkEndpoint);
updateDisposition(linkEndpoint.getRole(), deliveryId, deliveryId, state, settled);
}
private UnsignedInteger getDeliveryId(final DeliveryRegistry deliveryRegistry,
final Binary deliveryTag,
final LinkEndpoint<?, ?> linkEndpoint)
{
final UnsignedInteger deliveryId = deliveryRegistry.getDeliveryId(deliveryTag, linkEndpoint);
if (deliveryId == null)
{
throw new ConnectionScopedRuntimeException(String.format(
"Delivery with tag '%s' is not found in unsettled deliveries", deliveryTag));
}
return deliveryId;
}
private SortedSet<UnsignedInteger> getDeliveryIds(final Set<Binary> deliveryTags, final LinkEndpoint<?, ?> linkEndpoint)
{
final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole());
return deliveryTags.stream()
.map(deliveryTag -> getDeliveryId(deliveryRegistry, deliveryTag, linkEndpoint))
.collect(Collectors.toCollection(TreeSet::new));
}
private UnsignedInteger getDeliveryId(final Binary deliveryTag, final LinkEndpoint<?, ?> linkEndpoint)
{
final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole());
return getDeliveryId(deliveryRegistry, deliveryTag, linkEndpoint);
}
private DeliveryRegistry getDeliveryRegistry(final Role role)
{
return role == Role.RECEIVER ? getIncomingDeliveryRegistry() : getOutgoingDeliveryRegistry();
}
void updateDisposition(final LinkEndpoint<?,?> linkEndpoint,
final Set<Binary> deliveryTags,
final DeliveryState state,
final boolean settled)
{
final Role role = linkEndpoint.getRole();
final Iterator<UnsignedInteger> iterator = getDeliveryIds(deliveryTags, linkEndpoint).iterator();
if (iterator.hasNext())
{
UnsignedInteger begin = iterator.next();
UnsignedInteger end = begin;
while (iterator.hasNext())
{
final UnsignedInteger deliveryId = iterator.next();
if (!end.add(UnsignedInteger.ONE).equals(deliveryId))
{
updateDisposition(role, begin, end, state, settled);
begin = deliveryId;
end = begin;
}
else
{
end = deliveryId;
}
}
updateDisposition(role, begin, end, state, settled);
}
}
public boolean hasCreditToSend()
{
boolean b = _remoteIncomingWindow > 0;
boolean b1 = getOutgoingWindow() != null && getOutgoingWindow().compareTo(UnsignedInteger.ZERO) > 0;
return b && b1;
}
public void end()
{
end(new End());
}
void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint)
{
_nextOutgoingId.incr();
final boolean settled = Boolean.TRUE.equals(xfr.getSettled());
UnsignedInteger deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
xfr.setDeliveryId(deliveryId);
if (!settled)
{
final UnsettledDelivery delivery = new UnsettledDelivery(xfr.getDeliveryTag(), endpoint);
_outgoingDeliveryRegistry.addDelivery(deliveryId, delivery);
}
_remoteIncomingWindow--;
try (QpidByteBuffer payload = xfr.getPayload())
{
long remaining = payload == null ? 0 : (long) payload.remaining();
int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload);
if(payload != null)
{
while (payloadSent < remaining && payloadSent >= 0)
{
Transfer continuationTransfer = new Transfer();
continuationTransfer.setHandle(xfr.getHandle());
continuationTransfer.setRcvSettleMode(xfr.getRcvSettleMode());
continuationTransfer.setState(xfr.getState());
continuationTransfer.setPayload(payload);
_nextOutgoingId.incr();
_remoteIncomingWindow--;
remaining = (long) payload.remaining();
payloadSent = _connection.sendFrame(_sendingChannel, continuationTransfer, payload);
continuationTransfer.dispose();
}
}
}
catch (OversizeFrameException e)
{
throw new ConnectionScopedRuntimeException(e);
}
}
public boolean isActive()
{
return _sessionState == SessionState.ACTIVE;
}
public void receiveEnd(final End end)
{
receivedComplete();
switch (_sessionState)
{
case END_SENT:
remoteEnd(end);
_sessionState = SessionState.ENDED;
break;
case ACTIVE:
_sessionState = SessionState.END_RECVD;
detachLinks();
remoteEnd(end);
_connection.sendEnd(_sendingChannel, new End(), true);
_sessionState = SessionState.ENDED;
break;
default:
End reply = new End();
Error error = new Error();
error.setCondition(AmqpError.ILLEGAL_STATE);
error.setDescription("END called on Session which has not been opened");
reply.setError(error);
_connection.sendEnd(_sendingChannel, reply, true);
break;
}
}
public UnsignedInteger getNextOutgoingId()
{
return UnsignedInteger.valueOf(_nextOutgoingId.intValue());
}
public void sendFlowConditional()
{
if(_nextIncomingId != null)
{
UnsignedInteger clientsCredit =
_lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingId.intValue()));
// TODO - we should use a better metric here, and/or manage session credit across the whole connection
// send a flow if the window is at least half used up
if (_incomingWindow.subtract(clientsCredit).compareTo(clientsCredit) >= 0)
{
sendFlow();
}
}
}
public UnsignedInteger getOutgoingWindow()
{
return _outgoingWindow;
}
public void receiveFlow(final Flow flow)
{
receivedComplete();
final SequenceNumber flowNextIncomingId = new SequenceNumber(flow.getNextIncomingId() == null
? _initialOutgoingId.intValue()
: flow.getNextIncomingId().intValue());
if (flowNextIncomingId.compareTo(_nextOutgoingId) > 0)
{
final End end = new End();
end.setError(new Error(SessionError.WINDOW_VIOLATION,
String.format("Next incoming id '%d' exceeds next outgoing id '%d'",
flowNextIncomingId.longValue(),
_nextOutgoingId.longValue())));
end(end);
}
else
{
_remoteIncomingWindow = flowNextIncomingId.longValue() + flow.getIncomingWindow().longValue()
- _nextOutgoingId.longValue();
_nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
_remoteOutgoingWindow = flow.getOutgoingWindow();
UnsignedInteger handle = flow.getHandle();
if (handle != null)
{
final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint = _inputHandleToEndpoint.get(handle);
if (endpoint == null)
{
End end = new End();
end.setError(new Error(SessionError.UNATTACHED_HANDLE,
String.format("Received Flow with unknown handle %d", handle.intValue())));
end(end);
}
else
{
endpoint.receiveFlow(flow);
}
}
else
{
final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints =
_inputHandleToEndpoint.values();
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
{
le.flowStateChanged();
}
if (Boolean.TRUE.equals(flow.getEcho()))
{
sendFlow();
}
}
}
}
public void receiveDisposition(final Disposition disposition)
{
Role dispositionRole = disposition.getRole();
DeliveryRegistry unsettledDeliveries;
if(dispositionRole == Role.RECEIVER)
{
unsettledDeliveries = _outgoingDeliveryRegistry;
}
else
{
unsettledDeliveries = _incomingDeliveryRegistry;
}
SequenceNumber deliveryId = new SequenceNumber(disposition.getFirst().intValue());
SequenceNumber last;
if(disposition.getLast() == null)
{
last = new SequenceNumber(deliveryId.intValue());
}
else
{
last = new SequenceNumber(disposition.getLast().intValue());
}
while(deliveryId.compareTo(last)<=0)
{
UnsignedInteger deliveryIdUnsigned = UnsignedInteger.valueOf(deliveryId.intValue());
UnsettledDelivery unsettledDelivery = unsettledDeliveries.getDelivery(deliveryIdUnsigned);
if(unsettledDelivery != null)
{
LinkEndpoint<?,?> linkEndpoint = unsettledDelivery.getLinkEndpoint();
linkEndpoint.receiveDeliveryState(unsettledDelivery.getDeliveryTag(), disposition.getState(), disposition.getSettled());
if (Boolean.TRUE.equals(disposition.getSettled()))
{
unsettledDeliveries.removeDelivery(deliveryIdUnsigned);
}
}
deliveryId.incr();
}
}
public SessionState getSessionState()
{
return _sessionState;
}
public void sendFlow()
{
sendFlow(new Flow());
}
public void sendFlow(final Flow flow)
{
if(_nextIncomingId != null)
{
flow.setNextIncomingId(_nextIncomingId.unsignedIntegerValue());
_lastSentIncomingLimit = _incomingWindow.add(_nextIncomingId.unsignedIntegerValue());
}
flow.setIncomingWindow(_incomingWindow);
flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingId.intValue()));
flow.setOutgoingWindow(_outgoingWindow);
send(flow);
}
public void receiveDetach(final Detach detach)
{
receivedComplete();
UnsignedInteger handle = detach.getHandle();
detach(handle, detach);
}
public void sendAttach(final Attach attach)
{
send(attach);
}
private void send(final FrameBody frameBody)
{
_connection.sendFrame(_sendingChannel, frameBody);
}
public boolean isSyntheticError(final Error error)
{
return error == _sessionEndedLinkError;
}
public void end(final End end)
{
switch (_sessionState)
{
case BEGIN_SENT:
_connection.sendEnd(_sendingChannel, end, false);
_sessionState = SessionState.END_PIPE;
break;
case ACTIVE:
detachLinks();
_connection.sendEnd(_sendingChannel, end, true);
_sessionState = SessionState.END_SENT;
break;
default:
End reply = new End();
Error error = new Error();
error.setCondition(AmqpError.ILLEGAL_STATE);
error.setDescription("END called on Session which has not been opened");
reply.setError(error);
_connection.sendEnd(_sendingChannel, reply, true);
break;
}
}
public void receiveTransfer(final Transfer transfer)
{
_nextIncomingId.incr();
_remoteOutgoingWindow = _remoteOutgoingWindow.subtract(UnsignedInteger.ONE);
UnsignedInteger inputHandle = transfer.getHandle();
LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint = _inputHandleToEndpoint.get(inputHandle);
if (linkEndpoint == null)
{
Error error = new Error();
error.setCondition(SessionError.UNATTACHED_HANDLE);
error.setDescription("TRANSFER called on Session for link handle " + inputHandle + " which is not attached.");
_connection.close(error);
}
else if(!(linkEndpoint instanceof AbstractReceivingLinkEndpoint))
{
Error error = new Error();
error.setCondition(AmqpError.PRECONDITION_FAILED);
error.setDescription("Received TRANSFER for link handle " + inputHandle + " which is a sending link not a receiving link.");
_connection.close(error);
}
else
{
AbstractReceivingLinkEndpoint endpoint = ((AbstractReceivingLinkEndpoint) linkEndpoint);
endpoint.receiveTransfer(transfer);
}
}
boolean isEnded()
{
return _sessionState == SessionState.ENDED || _connection.isClosed();
}
UnsignedInteger getIncomingWindow()
{
return _incomingWindow;
}
AccessControlContext getAccessControllerContext()
{
return _accessControllerContext;
}
public ReceivingDestination getReceivingDestination(final Link_1_0<?, ?> link,
final Target target) throws AmqpErrorException
{
final ReceivingDestination destination;
if (target != null)
{
if (Boolean.TRUE.equals(target.getDynamic()))
{
MessageDestination tempDestination = createDynamicDestination(link, target);
if(tempDestination != null)
{
target.setAddress(_primaryDomain + tempDestination.getName());
}
else
{
throw new AmqpErrorException(AmqpError.INTERNAL_ERROR, "Cannot create dynamic destination");
}
}
String addr = target.getAddress();
if (addr == null || "".equals(addr.trim()))
{
destination = new AnonymousRelayDestination(getAddressSpace(), target, _connection.getEventLogger());
}
else
{
DestinationAddress destinationAddress = new DestinationAddress(getAddressSpace(), addr, true);
MessageDestination messageDestination = destinationAddress.getMessageDestination();
if (messageDestination != null)
{
destination = new NodeReceivingDestination(destinationAddress,
target.getDurable(),
target.getExpiryPolicy(),
target.getCapabilities(),
_connection.getEventLogger());
}
else
{
destination = null;
}
}
}
else
{
destination = null;
}
if (destination == null)
{
throw new AmqpErrorException(AmqpError.NOT_FOUND,
String.format("Could not find destination for target '%s'", target));
}
return destination;
}
public boolean updateSourceForSubscription(final SendingLinkEndpoint linkEndpoint, final Source newSource,
final SendingDestination newDestination)
{
SendingDestination oldDestination = linkEndpoint.getDestination();
if (oldDestination instanceof ExchangeSendingDestination)
{
ExchangeSendingDestination oldExchangeDestination = (ExchangeSendingDestination) oldDestination;
String newAddress = newSource.getAddress();
if (newDestination instanceof ExchangeSendingDestination)
{
ExchangeSendingDestination newExchangeDestination = (ExchangeSendingDestination) newDestination;
if (oldExchangeDestination.getQueue() != newExchangeDestination.getQueue())
{
Source oldSource = linkEndpoint.getSource();
oldSource.setAddress(newAddress);
oldSource.setFilter(newSource.getFilter());
return true;
}
}
}
return false;
}
public SendingDestination getSendingDestination(final Link_1_0<?, ?> link,
final Source source) throws AmqpErrorException
{
SendingDestination destination = null;
if (Boolean.TRUE.equals(source.getDynamic()))
{
MessageSource tempSource = createDynamicSource(link, source);
if(tempSource != null)
{
source.setAddress(_primaryDomain + tempSource.getName());
}
else
{
throw new AmqpErrorException(AmqpError.INTERNAL_ERROR, "Cannot create dynamic source");
}
}
String address = source.getAddress();
if (address != null)
{
if (!address.startsWith("/") && address.contains("/"))
{
destination = createExchangeDestination(address, link.getName(), source);
}
else
{
MessageSource queue = getAddressSpace().getAttainedMessageSource(address);
if (queue != null)
{
destination = new StandardSendingDestination(queue);
}
else
{
destination = createExchangeDestination(address, null, link.getName(), source);
}
}
}
if (destination == null)
{
throw new AmqpErrorException(AmqpError.NOT_FOUND,
String.format("Could not find destination for source '%s'", source));
}
return destination;
}
private ExchangeSendingDestination createExchangeDestination(String address, final String linkName, final Source source)
throws AmqpErrorException
{
String[] parts = address.split("/", 2);
String exchangeName = parts[0];
String bindingKey = parts[1];
return createExchangeDestination(exchangeName, bindingKey, linkName, source);
}
private ExchangeSendingDestination createExchangeDestination(final String exchangeName,
final String bindingKey,
final String linkName,
final Source source) throws AmqpErrorException
{
ExchangeSendingDestination exchangeDestination = null;
Exchange<?> exchange = getExchange(exchangeName);
if (exchange != null)
{
if (!Boolean.TRUE.equals(source.getDynamic()))
{
String remoteContainerId = getConnection().getRemoteContainerId();
exchangeDestination = new ExchangeSendingDestination(exchange, linkName, bindingKey, remoteContainerId, source);
source.setFilter(exchangeDestination.getFilters());
source.setDistributionMode(StdDistMode.COPY);
}
else
{
// TODO
throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Temporary subscription is not implemented"));
}
}
return exchangeDestination;
}
private MessageSource createDynamicSource(final Link_1_0<?, ?> link,
final Terminus terminus) throws AmqpErrorException
{
// TODO temporary topics?
final String queueName = "TempQueue" + UUID.randomUUID().toString();
try
{
final Map<String, Object> attributes = createDynamicNodeAttributes(link, terminus, queueName);
if (terminus.getCapabilities() != null)
{
final Set<Symbol> capabilities = Sets.newHashSet(terminus.getCapabilities());
if (capabilities.contains(Symbol.valueOf("temporary-queue"))
|| capabilities.contains(Symbol.valueOf("temporary-topic")))
{
attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
}
}
return Subject.doAs(getSubjectWithAddedSystemRights(),
(PrivilegedAction<MessageSource>) () -> getAddressSpace().createMessageSource(MessageSource.class, attributes));
}
catch (AccessControlException e)
{
throw new AmqpErrorException(AmqpError.UNAUTHORIZED_ACCESS, e.getMessage());
}
catch (AbstractConfiguredObject.DuplicateNameException e)
{
LOGGER.error("A temporary queue was created with a name which collided with an existing queue name");
throw new ConnectionScopedRuntimeException(e);
}
}
private MessageDestination createDynamicDestination(final Link_1_0<?, ?> link,
final Terminus terminus) throws AmqpErrorException
{
final Symbol[] capabilities = terminus.getCapabilities();
final Set<Symbol> capabilitySet = capabilities == null ? Collections.emptySet() : Sets.newHashSet(capabilities);
boolean isTopic = capabilitySet.contains(Symbol.valueOf("temporary-topic")) || capabilitySet.contains(Symbol.valueOf("topic"));
final String destName = (isTopic ? "TempTopic" : "TempQueue") + UUID.randomUUID().toString();
try
{
final Map<String, Object> attributes = createDynamicNodeAttributes(link, terminus, destName);
Class<? extends MessageDestination> clazz = isTopic ? Exchange.class : MessageDestination.class;
if (isTopic)
{
attributes.put(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
}
else if (capabilitySet.contains(Symbol.valueOf("temporary-queue")))
{
attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
}
return Subject.doAs(getSubjectWithAddedSystemRights(),
(PrivilegedAction<MessageDestination>) () -> getAddressSpace().createMessageDestination(clazz, attributes));
}
catch (AccessControlException e)
{
throw new AmqpErrorException(AmqpError.UNAUTHORIZED_ACCESS, e.getMessage());
}
catch (AbstractConfiguredObject.DuplicateNameException e)
{
LOGGER.error("A temporary destination was created with a name which collided with an existing destination name '{}'", destName);
throw new ConnectionScopedRuntimeException(e);
}
}
private Map<String, Object> createDynamicNodeAttributes(final Link_1_0<?, ?> link,
final Terminus terminus,
final String nodeName)
{
// TODO convert AMQP 1-0 node properties to queue attributes
final Map<Symbol, Object> properties = terminus.getDynamicNodeProperties();
final TerminusExpiryPolicy expiryPolicy = terminus.getExpiryPolicy();
LifetimePolicy lifetimePolicy = properties == null
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
Map<String,Object> attributes = new HashMap<>();
attributes.put(ConfiguredObject.ID, UUID.randomUUID());
attributes.put(ConfiguredObject.NAME, nodeName);
attributes.put(ConfiguredObject.DURABLE, TerminusExpiryPolicy.NEVER.equals(expiryPolicy));
if(lifetimePolicy instanceof DeleteOnNoLinks)
{
attributes.put(ConfiguredObject.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS);
}
else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
{
attributes.put(ConfiguredObject.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.IN_USE);
}
else if(lifetimePolicy instanceof DeleteOnClose)
{
attributes.put(ConfiguredObject.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CREATING_LINK_CLOSE);
final CreatingLinkInfo linkInfo = new CreatingLinkInfoImpl(link.getRole() == Role.SENDER, link.getRemoteContainerId(), link.getName());
attributes.put("creatingLinkInfo", linkInfo);
}
else if(lifetimePolicy instanceof DeleteOnNoMessages)
{
attributes.put(ConfiguredObject.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.IN_USE);
}
else
{
attributes.put(ConfiguredObject.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
}
return attributes;
}
ServerTransaction getTransaction(Binary transactionId)
{
final int txnId;
try
{
txnId = transactionIdToInteger(transactionId);
}
catch (IllegalArgumentException e)
{
throw new UnknownTransactionException(e.getMessage());
}
return _connection.getTransaction(txnId);
}
void remoteEnd(End end)
{
Set<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> associatedLinkEndpoints = new HashSet<>(_associatedLinkEndpoints);
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : associatedLinkEndpoints)
{
linkEndpoint.remoteDetached(new Detach());
linkEndpoint.destroy();
}
_associatedLinkEndpoints.clear();
_connection.sessionEnded(this);
performCloseTasks();
}
static Integer transactionIdToInteger(final Binary txnId)
{
if (txnId == null)
{
throw new UnknownTransactionException("'null' is not a valid transaction-id.");
}
byte[] data = txnId.getArray();
if (data.length > 4)
{
throw new IllegalArgumentException("transaction-id cannot have more than 32-bit.");
}
int id = 0;
for (int i = 0; i < data.length; i++)
{
id <<= 8;
id |= ((int) data[i] & 0xff);
}
return id;
}
static Binary integerToTransactionId(final int txnId)
{
byte[] data = new byte[4];
data[3] = (byte) (txnId & 0xff);
data[2] = (byte) ((txnId & 0xff00) >> 8);
data[1] = (byte) ((txnId & 0xff0000) >> 16);
data[0] = (byte) ((txnId & 0xff000000) >> 24);
return new Binary(data);
}
@Override
public void close()
{
performCloseTasks();
end();
}
private void performCloseTasks()
{
if(_closed.compareAndSet(false, true))
{
List<Action<? super Session_1_0>> taskList = new ArrayList<>(_taskList);
_taskList.clear();
for(Action<? super Session_1_0> task : taskList)
{
task.performAction(this);
}
getAMQPConnection().getEventLogger().message(getLogSubject(), ChannelMessages.CLOSE());
}
}
public void close(ErrorCondition condition, String message)
{
performCloseTasks();
final End end = new End();
final Error theError = new Error();
theError.setDescription(message);
theError.setCondition(condition);
end.setError(theError);
end(end);
}
@Override
public void transportStateChanged()
{
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof SendingLinkEndpoint)
{
ConsumerTarget_1_0 target = ((SendingLinkEndpoint) linkEndpoint).getConsumerTarget();
target.flowStateChanged();
}
}
if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
{
getAMQPConnection().notifyWork(this);
}
}
@Override
public void block(final Queue<?> queue)
{
getAMQPConnection().doOnIOThreadAsync(() -> doBlock(queue));
}
private void doBlock(final Queue<?> queue)
{
if(_blockingEntities.add(queue))
{
messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName()));
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof StandardReceivingLinkEndpoint
&& isQueueDestinationForLink(queue, ((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
{
linkEndpoint.setStopped(true);
}
}
}
}
private boolean isQueueDestinationForLink(final Queue<?> queue, final ReceivingDestination recvDest)
{
return (recvDest instanceof NodeReceivingDestination
&& queue == ((NodeReceivingDestination) recvDest).getDestination());
}
@Override
public void unblock(final Queue<?> queue)
{
getAMQPConnection().doOnIOThreadAsync(() -> doUnblock(queue));
}
private void doUnblock(final Queue<?> queue)
{
if(_blockingEntities.remove(queue) && !_blockingEntities.contains(this))
{
if(_blockingEntities.isEmpty())
{
messageWithSubject(ChannelMessages.FLOW_REMOVED());
}
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof StandardReceivingLinkEndpoint
&& isQueueDestinationForLink(queue, ((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
{
linkEndpoint.setStopped(false);
}
}
}
}
@Override
public void block()
{
getAMQPConnection().doOnIOThreadAsync(this::doBlock);
}
private void doBlock()
{
if(_blockingEntities.add(this))
{
messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues **"));
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof StandardReceivingLinkEndpoint)
{
linkEndpoint.setStopped(true);
}
}
}
}
@Override
public void unblock()
{
getAMQPConnection().doOnIOThreadAsync(this::doUnblock);
}
private void doUnblock()
{
if(_blockingEntities.remove(this))
{
if(_blockingEntities.isEmpty())
{
messageWithSubject(ChannelMessages.FLOW_REMOVED());
}
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof StandardReceivingLinkEndpoint
&& !_blockingEntities.contains(((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
{
linkEndpoint.setStopped(false);
}
}
}
}
@Override
public boolean getBlocking()
{
return !_blockingEntities.isEmpty();
}
private void messageWithSubject(final LogMessage operationalLogMessage)
{
getEventLogger().message(getLogSubject(), operationalLogMessage);
}
@Override
public Object getConnectionReference()
{
return getConnection().getReference();
}
@Override
public int getUnacknowledgedMessageCount()
{
return _outgoingDeliveryRegistry.size();
}
@Override
public String toLogString()
{
return getLogSubject().toLogString();
}
public AMQPConnection_1_0<?> getConnection()
{
return _connection;
}
@Override
public void addDeleteTask(final Action<? super Session_1_0> task)
{
// TODO: QPID-7951 is the closed guard important?
if(!_closed.get())
{
super.addDeleteTask(task);
}
}
public Subject getSubject()
{
return _subject;
}
private NamedAddressSpace getAddressSpace()
{
return _connection.getAddressSpace();
}
public SecurityToken getSecurityToken()
{
return _token;
}
@Override
public long getTransactionStartTimeLong()
{
return 0L;
}
@Override
public long getTransactionUpdateTimeLong()
{
return 0L;
}
@Override
protected void updateBlockedStateIfNecessary()
{
}
@Override
public boolean isClosing()
{
return END_STATES.contains(getSessionState()) || getConnection().isClosing();
}
@Override
public String toString()
{
return "Session_1_0[" + _connection + ": " + _sendingChannel + ']';
}
public void dissociateEndpoint(LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint)
{
for (Map.Entry<UnsignedInteger, LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> entry : _inputHandleToEndpoint.entrySet())
{
if (entry.getValue() == linkEndpoint)
{
_inputHandleToEndpoint.remove(entry.getKey());
break;
}
}
_endpointToOutputHandle.remove(linkEndpoint);
_associatedLinkEndpoints.remove(linkEndpoint);
if (linkEndpoint.getRole() == Role.RECEIVER)
{
getIncomingDeliveryRegistry().removeDeliveriesForLinkEndpoint(linkEndpoint);
}
else
{
getOutgoingDeliveryRegistry().removeDeliveriesForLinkEndpoint(linkEndpoint);
}
}
private void detach(UnsignedInteger handle, Detach detach)
{
if(_inputHandleToEndpoint.containsKey(handle))
{
LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint = _inputHandleToEndpoint.remove(handle);
endpoint.remoteDetached(detach);
_endpointToOutputHandle.remove(endpoint);
_associatedLinkEndpoints.remove(endpoint);
}
else
{
//TODO: QPID-7954
}
}
private void detachLinks()
{
Collection<UnsignedInteger> handles = new ArrayList<>(_inputHandleToEndpoint.keySet());
for(UnsignedInteger handle : handles)
{
Detach detach = new Detach();
detach.setClosed(false);
detach.setHandle(handle);
detach.setError(_sessionEndedLinkError);
detach(handle, detach);
}
}
private UnsignedInteger findNextAvailableOutputHandle()
{
int i = 0;
do
{
if(!_endpointToOutputHandle.containsValue(UnsignedInteger.valueOf(i)))
{
return UnsignedInteger.valueOf(i);
}
}
while(++i != 0);
return null;
}
private Exchange<?> getExchange(String name)
{
MessageDestination destination = getAddressSpace().getAttainedMessageDestination(name, false);
return destination instanceof Exchange ? (Exchange<?>) destination : null;
}
private Queue<?> getQueue(String name)
{
MessageSource source = getAddressSpace().getAttainedMessageSource(name);
return source instanceof Queue ? (Queue<?>) source : null;
}
private String getPrimaryDomain()
{
String primaryDomain = "";
final List<String> globalAddressDomains = getAddressSpace().getGlobalAddressDomains();
if (globalAddressDomains != null && !globalAddressDomains.isEmpty())
{
primaryDomain = globalAddressDomains.get(0);
if(primaryDomain != null)
{
primaryDomain = primaryDomain.trim();
if(!primaryDomain.endsWith("/"))
{
primaryDomain += "/";
}
}
}
return primaryDomain;
}
DeliveryRegistry getOutgoingDeliveryRegistry()
{
return _outgoingDeliveryRegistry;
}
DeliveryRegistry getIncomingDeliveryRegistry()
{
return _incomingDeliveryRegistry;
}
void receivedComplete()
{
_associatedLinkEndpoints.forEach(linkedEnpoint -> linkedEnpoint.receiveComplete());
}
private void checkMessageDestinationFlowForReceivingLinkEndpoint(final LinkEndpoint<?,?> endpoint)
{
if (endpoint instanceof StandardReceivingLinkEndpoint)
{
final ReceivingDestination destination =
((StandardReceivingLinkEndpoint) endpoint).getReceivingDestination();
if (_blockingEntities.contains(this)
|| _blockingEntities.contains(destination))
{
endpoint.setStopped(true);
}
else if (destination.getMessageDestination() instanceof Queue)
{
Queue<?> queue = (Queue<?>)destination.getMessageDestination();
if (queue.isQueueFlowStopped())
{
queue.checkCapacity();
}
}
}
}
private class EndpointCreationCallback<T extends LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> implements FutureCallback<T>
{
private final Attach _attach;
EndpointCreationCallback(final Attach attach)
{
_attach = attach;
}
@Override
public void onSuccess(final T endpoint)
{
doOnIOThreadAsync(() ->
{
_associatedLinkEndpoints.add(endpoint);
_inputHandleToEndpoint.put(_attach.getHandle(), endpoint);
UnsignedInteger nextAvailableOutputHandle = findNextAvailableOutputHandle();
if (nextAvailableOutputHandle == null)
{
endpoint.close(new Error(AmqpError.RESOURCE_LIMIT_EXCEEDED,
String.format(
"Cannot find free handle for endpoint '%s' on session '%s'",
_attach.getName(),
endpoint.getSession().toLogString())));
}
else
{
endpoint.setLocalHandle(nextAvailableOutputHandle);
if (endpoint instanceof ErrantLinkEndpoint)
{
endpoint.sendAttach();
((ErrantLinkEndpoint) endpoint).closeWithError();
}
else
{
if (!_endpointToOutputHandle.containsKey(endpoint))
{
_endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle());
checkMessageDestinationFlowForReceivingLinkEndpoint(endpoint);
endpoint.sendAttach();
endpoint.start();
}
else
{
final End end = new End();
end.setError(new Error(AmqpError.INTERNAL_ERROR,
"Endpoint is already registered with session."));
endpoint.getSession().end(end);
}
}
}
});
}
@Override
public void onFailure(final Throwable t)
{
String errorMessage = String.format("Failed to create LinkEndpoint in response to Attach: %s", _attach);
LOGGER.error(errorMessage, t);
throw new ConnectionScopedRuntimeException(errorMessage, t);
}
}
}