blob: 9a3900749eff0142eb148fb1a7376351944ced73 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v1_0;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
import static org.apache.qpid.server.protocol.v1_0.ExchangeDestination.SHARED_CAPABILITY;
import java.security.AccessControlContext;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.auth.Subject;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.filter.SelectorParsingException;
import org.apache.qpid.server.filter.selector.ParseException;
import org.apache.qpid.server.filter.selector.TokenMgrError;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.LifetimePolicy;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinks;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoLinksOrMessages;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
import org.apache.qpid.server.protocol.v1_0.type.messaging.ExactSubjectFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.MatchingSubjectFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.session.AbstractAMQPSession;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget_1_0>
implements LogSubject, org.apache.qpid.server.util.Deletable<Session_1_0>
{
public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
private static final Logger _logger = LoggerFactory.getLogger(Session_1_0.class);
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
private static final EnumSet<SessionState> END_STATES =
EnumSet.of(SessionState.END_RECVD, SessionState.END_PIPE, SessionState.END_SENT, SessionState.ENDED);
private final AMQPConnection_1_0<?> _connection;
private AtomicBoolean _closed = new AtomicBoolean();
private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_1_0>> _consumers = new CopyOnWriteArrayList<>();
private Session<?> _modelObject = this;
private SessionState _sessionState;
private final Map<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>, UnsignedInteger> _endpointToOutputHandle = new HashMap<>();
private final Map<UnsignedInteger, LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> _inputHandleToEndpoint = new HashMap<>();
private final Set<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> _associatedLinkEndpoints = new HashSet<>();
private final short _receivingChannel;
private final short _sendingChannel;
private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
private int _nextOutgoingDeliveryId;
private UnsignedInteger _outgoingSessionCredit;
private UnsignedInteger _initialOutgoingId = UnsignedInteger.valueOf(0);
private SequenceNumber _nextIncomingTransferId;
private SequenceNumber _nextOutgoingTransferId = new SequenceNumber(_initialOutgoingId.intValue());
private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
private final int _incomingWindowSize;
private int _availableOutgoingCredit = DEFAULT_SESSION_BUFFER_SIZE;
private UnsignedInteger _lastSentIncomingLimit;
private final Error _sessionEndedLinkError =
new Error(LinkError.DETACH_FORCED,
"Force detach the link because the session is remotely ended.");
private final String _primaryDomain;
private final Set<Object> _blockingEntities = Collections.newSetFromMap(new ConcurrentHashMap<Object,Boolean>());
private volatile long _startedTransactions;
private volatile long _committedTransactions;
private volatile long _rolledBackTransactions;
private volatile int _unacknowledgedMessages;
public Session_1_0(final AMQPConnection_1_0 connection,
Begin begin,
short sendingChannelId,
short receivingChannelId,
int incomingWindowSize)
{
super(connection, sendingChannelId);
_sendingChannel = sendingChannelId;
_receivingChannel = receivingChannelId;
_sessionState = SessionState.ACTIVE;
_nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
_connection = connection;
_primaryDomain = getPrimaryDomain();
_incomingWindowSize = incomingWindowSize;
AccessController.doPrivileged((new PrivilegedAction<Object>()
{
@Override
public Object run()
{
_connection.getEventLogger().message(ChannelMessages.CREATE());
return null;
}
}), _accessControllerContext);
}
public void sendDetach(final Detach detach)
{
send(detach);
}
public void receiveAttach(final Attach attach)
{
if(_sessionState == SessionState.ACTIVE)
{
UnsignedInteger inputHandle = attach.getHandle();
if (_inputHandleToEndpoint.containsKey(inputHandle))
{
String errorMessage = String.format("Input Handle '%d' already in use", inputHandle.intValue());
getConnection().close(new Error(SessionError.HANDLE_IN_USE, errorMessage));
throw new ConnectionScopedRuntimeException(errorMessage);
}
else
{
final Link_1_0<? extends BaseSource, ? extends BaseTarget> link;
if (attach.getRole() == Role.RECEIVER)
{
link = getAddressSpace().getSendingLink(getConnection().getRemoteContainerId(), attach.getName());
}
else
{
link = getAddressSpace().getReceivingLink(getConnection().getRemoteContainerId(), attach.getName());
}
final ListenableFuture<? extends LinkEndpoint<?,?>> future = link.attach(this, attach);
addFutureCallback(future, new EndpointCreationCallback(attach), MoreExecutors.directExecutor());
}
}
}
public void updateDisposition(final Role role,
final UnsignedInteger first,
final UnsignedInteger last,
final DeliveryState state, final boolean settled)
{
Disposition disposition = new Disposition();
disposition.setRole(role);
disposition.setFirst(first);
disposition.setLast(last);
disposition.setSettled(settled);
disposition.setState(state);
if (settled)
{
final LinkedHashMap<UnsignedInteger, Delivery> unsettled =
role == Role.RECEIVER ? _incomingUnsettled : _outgoingUnsettled;
SequenceNumber pos = new SequenceNumber(first.intValue());
SequenceNumber end = new SequenceNumber(last.intValue());
while (pos.compareTo(end) <= 0)
{
unsettled.remove(new UnsignedInteger(pos.intValue()));
pos.incr();
}
}
send(disposition);
//TODO - check send flow
}
public boolean hasCreditToSend()
{
boolean b = _outgoingSessionCredit != null && _outgoingSessionCredit.intValue() > 0;
boolean b1 = getOutgoingWindowSize() != null && getOutgoingWindowSize().compareTo(UnsignedInteger.ZERO) > 0;
return b && b1;
}
public void end()
{
end(new End());
}
public void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint, final boolean newDelivery)
{
_nextOutgoingTransferId.incr();
UnsignedInteger deliveryId;
final boolean settled = Boolean.TRUE.equals(xfr.getSettled());
if (newDelivery)
{
deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
endpoint.setLastDeliveryId(deliveryId);
if (!settled)
{
final Delivery delivery = new Delivery(xfr, endpoint);
_outgoingUnsettled.put(deliveryId, delivery);
_outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
endpoint.addUnsettled(delivery);
}
}
else
{
deliveryId = endpoint.getLastDeliveryId();
final Delivery delivery = _outgoingUnsettled.get(deliveryId);
if (delivery != null)
{
if (!settled)
{
delivery.addTransfer(xfr);
_outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
}
else
{
_outgoingSessionCredit = _outgoingSessionCredit.add(new UnsignedInteger(delivery.getNumberOfTransfers()));
endpoint.settle(delivery.getDeliveryTag());
_outgoingUnsettled.remove(deliveryId);
}
}
}
xfr.setDeliveryId(deliveryId);
try
{
List<QpidByteBuffer> payload = xfr.getPayload();
final long remaining = QpidByteBufferUtils.remaining(payload);
int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload);
if(payload != null && payloadSent < remaining && payloadSent >= 0)
{
// TODO - should make this iterative and not recursive
Transfer secondTransfer = new Transfer();
secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
secondTransfer.setHandle(xfr.getHandle());
secondTransfer.setSettled(xfr.getSettled());
secondTransfer.setState(xfr.getState());
secondTransfer.setMessageFormat(xfr.getMessageFormat());
secondTransfer.setPayload(payload);
sendTransfer(secondTransfer, endpoint, false);
secondTransfer.dispose();
}
if (payload != null)
{
for (QpidByteBuffer buf : payload)
{
buf.dispose();
}
}
}
catch (OversizeFrameException e)
{
throw new ConnectionScopedRuntimeException(e);
}
}
public boolean isActive()
{
return _sessionState == SessionState.ACTIVE;
}
public void receiveEnd(final End end)
{
switch (_sessionState)
{
case END_SENT:
_sessionState = SessionState.ENDED;
break;
case ACTIVE:
_sessionState = SessionState.END_RECVD;
detachLinks();
remoteEnd(end);
_connection.sendEnd(_sendingChannel, new End(), true);
_sessionState = SessionState.ENDED;
break;
default:
End reply = new End();
Error error = new Error();
error.setCondition(AmqpError.ILLEGAL_STATE);
error.setDescription("END called on Session which has not been opened");
reply.setError(error);
_connection.sendEnd(_sendingChannel, reply, true);
break;
}
}
public UnsignedInteger getNextOutgoingId()
{
return UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue());
}
public void sendFlowConditional()
{
if(_nextIncomingTransferId != null)
{
UnsignedInteger clientsCredit =
_lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingTransferId.intValue()));
// TODO - we should use a better metric here, and/or manage session credit across the whole connection
// send a flow if the window is at least half used up
if (UnsignedInteger.valueOf(_incomingWindowSize).subtract(clientsCredit).compareTo(clientsCredit) >= 0)
{
sendFlow();
}
}
}
public UnsignedInteger getOutgoingWindowSize()
{
return UnsignedInteger.valueOf(_availableOutgoingCredit);
}
public void receiveFlow(final Flow flow)
{
UnsignedInteger handle = flow.getHandle();
final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint = handle == null ? null : _inputHandleToEndpoint.get(handle);
final UnsignedInteger nextOutgoingId =
flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
_outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
if (endpoint != null)
{
endpoint.receiveFlow(flow);
}
else
{
final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints = _inputHandleToEndpoint.values();
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
{
le.flowStateChanged();
}
}
}
public void setNextIncomingId(final UnsignedInteger nextIncomingId)
{
_nextIncomingTransferId = new SequenceNumber(nextIncomingId.intValue());
}
public void receiveDisposition(final Disposition disposition)
{
Role dispositionRole = disposition.getRole();
LinkedHashMap<UnsignedInteger, Delivery> unsettledTransfers;
if(dispositionRole == Role.RECEIVER)
{
unsettledTransfers = _outgoingUnsettled;
}
else
{
unsettledTransfers = _incomingUnsettled;
}
UnsignedInteger deliveryId = disposition.getFirst();
UnsignedInteger last = disposition.getLast();
if(last == null)
{
last = deliveryId;
}
while(deliveryId.compareTo(last)<=0)
{
Delivery delivery = unsettledTransfers.get(deliveryId);
if(delivery != null)
{
delivery.getLinkEndpoint().receiveDeliveryState(delivery,
disposition.getState(),
disposition.getSettled());
if (Boolean.TRUE.equals(disposition.getSettled()))
{
unsettledTransfers.remove(deliveryId);
}
}
deliveryId = deliveryId.add(UnsignedInteger.ONE);
}
if(Boolean.TRUE.equals(disposition.getSettled()))
{
//TODO - check send flow
}
}
public SessionState getSessionState()
{
return _sessionState;
}
public void sendFlow()
{
sendFlow(new Flow());
}
public void sendFlow(final Flow flow)
{
if(_nextIncomingTransferId != null)
{
final int nextIncomingId = _nextIncomingTransferId.intValue();
flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
_lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _incomingWindowSize);
}
flow.setIncomingWindow(UnsignedInteger.valueOf(_incomingWindowSize));
flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue()));
flow.setOutgoingWindow(UnsignedInteger.valueOf(_availableOutgoingCredit));
send(flow);
}
public void setOutgoingSessionCredit(final UnsignedInteger outgoingSessionCredit)
{
_outgoingSessionCredit = outgoingSessionCredit;
}
public void receiveDetach(final Detach detach)
{
UnsignedInteger handle = detach.getHandle();
detach(handle, detach);
}
public void sendAttach(final Attach attach)
{
send(attach);
}
private void send(final FrameBody frameBody)
{
_connection.sendFrame(_sendingChannel, frameBody);
}
public boolean isSyntheticError(final Error error)
{
return error == _sessionEndedLinkError;
}
public void end(final End end)
{
switch (_sessionState)
{
case BEGIN_SENT:
_connection.sendEnd(_sendingChannel, end, false);
_sessionState = SessionState.END_PIPE;
break;
case ACTIVE:
detachLinks();
short sendChannel = _sendingChannel;
_connection.sendEnd(sendChannel, end, true);
_sessionState = SessionState.END_SENT;
break;
default:
sendChannel = _sendingChannel;
End reply = new End();
Error error = new Error();
error.setCondition(AmqpError.ILLEGAL_STATE);
error.setDescription("END called on Session which has not been opened");
reply.setError(error);
_connection.sendEnd(sendChannel, reply, true);
break;
}
}
public void receiveTransfer(final Transfer transfer)
{
_nextIncomingTransferId.incr();
UnsignedInteger inputHandle = transfer.getHandle();
LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint = _inputHandleToEndpoint.get(inputHandle);
if (linkEndpoint == null)
{
Error error = new Error();
error.setCondition(AmqpError.ILLEGAL_STATE);
error.setDescription("TRANSFER called on Session for link handle " + inputHandle + " which is not attached");
_connection.close(error);
}
else if(!(linkEndpoint instanceof AbstractReceivingLinkEndpoint))
{
Error error = new Error();
error.setCondition(ConnectionError.FRAMING_ERROR);
error.setDescription("TRANSFER called on Session for link handle " + inputHandle + " which is a sending ink not a receiving link");
_connection.close(error);
}
else
{
AbstractReceivingLinkEndpoint endpoint = ((AbstractReceivingLinkEndpoint) linkEndpoint);
UnsignedInteger deliveryId = transfer.getDeliveryId();
if (deliveryId == null)
{
deliveryId = endpoint.getLastDeliveryId();
}
Delivery delivery = _incomingUnsettled.get(deliveryId);
if (delivery == null)
{
delivery = new Delivery(transfer, endpoint);
_incomingUnsettled.put(deliveryId, delivery);
if (Boolean.TRUE.equals(transfer.getMore()))
{
endpoint.setLastDeliveryId(transfer.getDeliveryId());
}
}
else
{
if (delivery.getDeliveryId().equals(deliveryId))
{
delivery.addTransfer(transfer);
if (!Boolean.TRUE.equals(transfer.getMore()))
{
endpoint.setLastDeliveryId(null);
}
}
else
{
End reply = new End();
Error error = new Error();
error.setCondition(AmqpError.ILLEGAL_STATE);
error.setDescription("TRANSFER called on Session for link handle "
+ inputHandle
+ " with incorrect delivery id "
+ transfer.getDeliveryId());
reply.setError(error);
_connection.sendEnd(_sendingChannel, reply, true);
return;
}
}
Error error = endpoint.receiveTransfer(transfer, delivery);
if(error != null)
{
endpoint.close(error);
}
if ((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
{
_incomingUnsettled.remove(deliveryId);
}
}
}
boolean isEnded()
{
return _sessionState == SessionState.ENDED || _connection.isClosed();
}
UnsignedInteger getIncomingWindowSize()
{
return UnsignedInteger.valueOf(_incomingWindowSize);
}
AccessControlContext getAccessControllerContext()
{
return _accessControllerContext;
}
public ReceivingDestination getReceivingDestination(final Target target) throws AmqpErrorException
{
final ReceivingDestination destination;
if (target != null)
{
if (Boolean.TRUE.equals(target.getDynamic()))
{
MessageDestination tempQueue = createDynamicDestination(target.getDynamicNodeProperties());
target.setAddress(tempQueue.getName());
}
String addr = target.getAddress();
if (addr == null || "".equals(addr.trim()))
{
MessageDestination messageDestination = getAddressSpace().getDefaultDestination();
destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
target.getExpiryPolicy(), "",
target.getCapabilities(),
_connection.getEventLogger());
}
else if (!addr.startsWith("/") && addr.contains("/"))
{
String[] parts = addr.split("/", 2);
Exchange<?> exchange = getExchange(parts[0]);
if (exchange != null)
{
Symbol[] capabilities1 = target.getCapabilities();
ExchangeDestination exchangeDestination = new ExchangeDestination(exchange,
null,
target.getDurable(),
target.getExpiryPolicy(),
parts[0],
parts[1],
capabilities1 != null ? Arrays.asList(capabilities1) : Collections.<Symbol>emptyList());
destination = exchangeDestination;
}
else
{
destination = null;
}
}
else
{
MessageDestination messageDestination =
getAddressSpace().getAttainedMessageDestination(addr);
if (messageDestination != null)
{
destination =
new NodeReceivingDestination(messageDestination,
target.getDurable(),
target.getExpiryPolicy(),
addr,
target.getCapabilities(),
_connection.getEventLogger());
}
else
{
Queue<?> queue = getQueue(addr);
if (queue != null)
{
destination = new QueueDestination(queue, addr);
}
else
{
destination = null;
}
}
}
}
else
{
destination = null;
}
if (destination != null)
{
target.setCapabilities(destination.getCapabilities());
}
else
{
throw new AmqpErrorException(AmqpError.NOT_FOUND,
String.format("Could not find destination for target '%s'", target));
}
return destination;
}
public boolean updateSourceForSubscription(final SendingLinkEndpoint linkEndpoint, final Source newSource,
final SendingDestination newDestination)
{
SendingDestination oldDestination = linkEndpoint.getDestination();
if (oldDestination instanceof ExchangeDestination)
{
ExchangeDestination oldExchangeDestination = (ExchangeDestination) oldDestination;
String newAddress = newSource.getAddress();
if (newDestination instanceof ExchangeDestination)
{
ExchangeDestination newExchangeDestination = (ExchangeDestination) newDestination;
if (oldExchangeDestination.getQueue() != newExchangeDestination.getQueue())
{
Source oldSource = linkEndpoint.getSource();
oldSource.setAddress(newAddress);
oldSource.setFilter(newSource.getFilter());
return true;
}
}
}
return false;
}
public SendingDestination getSendingDestination(final String linkName, final Source source) throws AmqpErrorException
{
SendingDestination destination = null;
if (Boolean.TRUE.equals(source.getDynamic()))
{
MessageSource tempQueue = createDynamicSource(source.getDynamicNodeProperties());
source.setAddress(tempQueue.getName()); // todo : temporary topic
}
String address = source.getAddress();
if (address != null)
{
if (!address.startsWith("/") && address.contains("/"))
{
destination = createExchangeDestination(address, linkName, source);
}
else
{
MessageSource queue = getAddressSpace().getAttainedMessageSource(address);
if (queue != null)
{
destination = new MessageSourceDestination(queue);
}
else
{
destination = createExchangeDestination(address, null, linkName, source);
}
}
}
if (destination == null)
{
throw new AmqpErrorException(AmqpError.NOT_FOUND,
String.format("Could not find destination for source '%s'", source));
}
return destination;
}
private ExchangeDestination createExchangeDestination(String address, final String linkName, final Source source)
throws AmqpErrorException
{
String[] parts = address.split("/", 2);
String exchangeName = parts[0];
String bindingKey = parts[1];
return createExchangeDestination(exchangeName, bindingKey, linkName, source);
}
private ExchangeDestination createExchangeDestination(final String exchangeName,
final String bindingKey,
final String linkName,
final Source source) throws AmqpErrorException
{
ExchangeDestination exchangeDestination = null;
Exchange<?> exchange = getExchange(exchangeName);
List<Symbol> sourceCapabilities = new ArrayList<>();
if (exchange != null)
{
Queue queue = null;
if (!Boolean.TRUE.equals(source.getDynamic()))
{
final Map<String, Object> attributes = new HashMap<>();
boolean isDurable = source.getExpiryPolicy() == TerminusExpiryPolicy.NEVER;
boolean isShared = hasCapability(source.getCapabilities(), SHARED_CAPABILITY);
boolean isGlobal = hasCapability(source.getCapabilities(), ExchangeDestination.GLOBAL_CAPABILITY);
final String name = getMangledSubscriptionName(linkName, isDurable, isShared, isGlobal);
if (isGlobal)
{
sourceCapabilities.add(ExchangeDestination.GLOBAL_CAPABILITY);
}
ExclusivityPolicy exclusivityPolicy;
if (isShared)
{
exclusivityPolicy = ExclusivityPolicy.SHARED_SUBSCRIPTION;
sourceCapabilities.add(SHARED_CAPABILITY);
}
else
{
exclusivityPolicy = ExclusivityPolicy.LINK;
}
org.apache.qpid.server.model.LifetimePolicy lifetimePolicy = getLifetimePolicy(source.getExpiryPolicy());
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, name);
attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
attributes.put(Queue.DURABLE, isDurable);
BindingInfo bindingInfo = new BindingInfo(exchange, name,
bindingKey, source.getFilter());
Map<String, Map<String, Object>> bindings = bindingInfo.getBindings();
try
{
if (getAddressSpace() instanceof QueueManagingVirtualHost)
{
try
{
queue = ((QueueManagingVirtualHost) getAddressSpace()).getSubscriptionQueue(exchangeName, attributes, bindings);
}
catch (NotFoundException e)
{
throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, e.getMessage()));
}
}
else
{
throw new AmqpErrorException(new Error(AmqpError.INTERNAL_ERROR,
"Address space of unexpected type"));
}
}
catch(IllegalStateException e)
{
throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED,
"Subscription is already in use"));
}
source.setFilter(bindingInfo.getActualFilters().isEmpty() ? null : bindingInfo.getActualFilters());
source.setDistributionMode(StdDistMode.COPY);
exchangeDestination = new ExchangeDestination(exchange,
queue,
source.getDurable(),
source.getExpiryPolicy(),
exchangeName,
bindingKey,
sourceCapabilities);
}
else
{
// TODO
throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Temporary subscription is not implemented"));
}
}
return exchangeDestination;
}
private org.apache.qpid.server.model.LifetimePolicy getLifetimePolicy(final TerminusExpiryPolicy expiryPolicy) throws AmqpErrorException
{
org.apache.qpid.server.model.LifetimePolicy lifetimePolicy;
if (expiryPolicy == null || expiryPolicy == TerminusExpiryPolicy.SESSION_END)
{
lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_SESSION_END;
}
else if (expiryPolicy == TerminusExpiryPolicy.LINK_DETACH)
{
lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
}
else if (expiryPolicy == TerminusExpiryPolicy.CONNECTION_CLOSE)
{
lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
}
else if (expiryPolicy == TerminusExpiryPolicy.NEVER)
{
lifetimePolicy = org.apache.qpid.server.model.LifetimePolicy.PERMANENT;
}
else
{
Error error = new Error(AmqpError.NOT_IMPLEMENTED,
String.format("unknown ExpiryPolicy '%s'", expiryPolicy.getValue()));
throw new AmqpErrorException(error);
}
return lifetimePolicy;
}
private String getMangledSubscriptionName(final String linkName,
final boolean isDurable,
final boolean isShared,
final boolean isGlobal)
{
String remoteContainerId = getConnection().getRemoteContainerId();
if (isGlobal)
{
remoteContainerId = "_global_";
}
else
{
remoteContainerId = sanitizeName(remoteContainerId);
}
String subscriptionName;
if (!isDurable && !isShared)
{
subscriptionName = UUID.randomUUID().toString();
}
else
{
subscriptionName = linkName;
if (isShared)
{
int separator = subscriptionName.indexOf("|");
if (separator > 0)
{
subscriptionName = subscriptionName.substring(0, separator);
}
}
subscriptionName = sanitizeName(subscriptionName);
}
return "qpidsub_/" + remoteContainerId + "_/" + subscriptionName + "_/" + (isDurable
? "durable"
: "nondurable");
}
private String sanitizeName(String name)
{
return name.replace("_", "__")
.replace(".", "_:")
.replace("(", "_O")
.replace(")", "_C")
.replace("<", "_L")
.replace(">", "_R");
}
private boolean hasCapability(final Symbol[] capabilities,
final Symbol expectedCapability)
{
if (capabilities != null)
{
for (Symbol capability : capabilities)
{
if (expectedCapability.equals(capability))
{
return true;
}
}
}
return false;
}
private MessageSource createDynamicSource(Map properties)
{
final String queueName = _primaryDomain + "TempQueue" + UUID.randomUUID().toString();
MessageSource queue = null;
try
{
Map<String, Object> attributes = convertDynamicNodePropertiesToAttributes(properties, queueName);
queue = getAddressSpace().createMessageSource(MessageSource.class, attributes);
}
catch (AccessControlException e)
{
Error error = new Error();
error.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
error.setDescription(e.getMessage());
_connection.close(error);
}
catch (AbstractConfiguredObject.DuplicateNameException e)
{
_logger.error("A temporary queue was created with a name which collided with an existing queue name");
throw new ConnectionScopedRuntimeException(e);
}
return queue;
}
private MessageDestination createDynamicDestination(Map properties)
{
final String queueName = _primaryDomain + "TempQueue" + UUID.randomUUID().toString();
MessageDestination queue = null;
try
{
Map<String, Object> attributes = convertDynamicNodePropertiesToAttributes(properties, queueName);
queue = getAddressSpace().createMessageDestination(MessageDestination.class, attributes);
}
catch (AccessControlException e)
{
Error error = new Error();
error.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
error.setDescription(e.getMessage());
_connection.close(error);
}
catch (AbstractConfiguredObject.DuplicateNameException e)
{
_logger.error("A temporary queue was created with a name which collided with an existing queue name");
throw new ConnectionScopedRuntimeException(e);
}
return queue;
}
private Map<String, Object> convertDynamicNodePropertiesToAttributes(final Map properties, final String queueName)
{
// TODO convert AMQP 1-0 node properties to queue attributes
LifetimePolicy lifetimePolicy = properties == null
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
attributes.put(Queue.DURABLE, false);
if(lifetimePolicy instanceof DeleteOnNoLinks)
{
attributes.put(Queue.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS);
}
else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
{
attributes.put(Queue.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.IN_USE);
}
else if(lifetimePolicy instanceof DeleteOnClose)
{
attributes.put(Queue.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
}
else if(lifetimePolicy instanceof DeleteOnNoMessages)
{
attributes.put(Queue.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.IN_USE);
}
else
{
attributes.put(Queue.LIFETIME_POLICY,
org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
}
return attributes;
}
ServerTransaction getTransaction(Binary transactionId)
{
// TODO - deal with the case where the txn id is invalid
return _connection.getTransaction(binaryToInteger(transactionId));
}
void remoteEnd(End end)
{
Set<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> associatedLinkEndpoints = new HashSet<>(_associatedLinkEndpoints);
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : associatedLinkEndpoints)
{
linkEndpoint.remoteDetached(new Detach());
linkEndpoint.destroy();
}
_associatedLinkEndpoints.clear();
_connection.sessionEnded(this);
performCloseTasks();
if(_modelObject != null)
{
_modelObject.delete();
}
}
Integer binaryToInteger(final Binary txnId)
{
if(txnId == null)
{
return null;
}
byte[] data = txnId.getArray();
if(data.length > 4)
{
throw new IllegalArgumentException();
}
int id = 0;
for(int i = 0; i < data.length; i++)
{
id <<= 8;
id |= ((int)data[i] & 0xff);
}
return id;
}
Binary integerToBinary(final int txnId)
{
byte[] data = new byte[4];
data[3] = (byte) (txnId & 0xff);
data[2] = (byte) ((txnId & 0xff00) >> 8);
data[1] = (byte) ((txnId & 0xff0000) >> 16);
data[0] = (byte) ((txnId & 0xff000000) >> 24);
return new Binary(data);
}
@Override
public void close()
{
performCloseTasks();
end();
if(_modelObject != null)
{
_modelObject.delete();
}
}
private void performCloseTasks()
{
if(_closed.compareAndSet(false, true))
{
List<Action<? super Session_1_0>> taskList = new ArrayList<Action<? super Session_1_0>>(_taskList);
_taskList.clear();
for(Action<? super Session_1_0> task : taskList)
{
task.performAction(this);
}
getAMQPConnection().getEventLogger().message(_logSubject,ChannelMessages.CLOSE());
}
}
public void close(ErrorCondition condition, String message)
{
performCloseTasks();
final End end = new End();
final Error theError = new Error();
theError.setDescription(message);
theError.setCondition(condition);
end.setError(theError);
end(end);
}
@Override
public void transportStateChanged()
{
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof SendingLinkEndpoint)
{
ConsumerTarget_1_0 target = ((SendingLinkEndpoint) linkEndpoint).getConsumerTarget();
target.flowStateChanged();
}
}
if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
{
getAMQPConnection().notifyWork(this);
}
}
@Override
public void block(final Queue<?> queue)
{
getAMQPConnection().doOnIOThreadAsync(
new Runnable()
{
@Override
public void run()
{
doBlock(queue);
}
});
}
private void doBlock(final Queue<?> queue)
{
if(_blockingEntities.add(queue))
{
messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName()));
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof StandardReceivingLinkEndpoint
&& isQueueDestinationForLink(queue, ((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
{
linkEndpoint.setStopped(true);
}
}
}
}
private boolean isQueueDestinationForLink(final Queue<?> queue, final ReceivingDestination recvDest)
{
return (recvDest instanceof NodeReceivingDestination
&& queue == ((NodeReceivingDestination) recvDest).getDestination())
|| recvDest instanceof QueueDestination && queue == ((QueueDestination) recvDest).getQueue();
}
@Override
public void unblock(final Queue<?> queue)
{
getAMQPConnection().doOnIOThreadAsync(
new Runnable()
{
@Override
public void run()
{
doUnblock(queue);
}
});
}
private void doUnblock(final Queue<?> queue)
{
if(_blockingEntities.remove(queue) && !_blockingEntities.contains(this))
{
if(_blockingEntities.isEmpty())
{
messageWithSubject(ChannelMessages.FLOW_REMOVED());
}
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof StandardReceivingLinkEndpoint
&& isQueueDestinationForLink(queue, ((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
{
linkEndpoint.setStopped(false);
}
}
}
}
@Override
public void block()
{
getAMQPConnection().doOnIOThreadAsync(
new Runnable()
{
@Override
public void run()
{
doBlock();
}
});
}
private void doBlock()
{
if(_blockingEntities.add(this))
{
messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues **"));
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof StandardReceivingLinkEndpoint)
{
linkEndpoint.setStopped(true);
}
}
}
}
@Override
public void unblock()
{
getAMQPConnection().doOnIOThreadAsync(
new Runnable()
{
@Override
public void run()
{
doUnblock();
}
});
}
private void doUnblock()
{
if(_blockingEntities.remove(this))
{
if(_blockingEntities.isEmpty())
{
messageWithSubject(ChannelMessages.FLOW_REMOVED());
}
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof StandardReceivingLinkEndpoint
&& !_blockingEntities.contains(((StandardReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
{
linkEndpoint.setStopped(false);
}
}
}
}
@Override
public boolean getBlocking()
{
return !_blockingEntities.isEmpty();
}
private void messageWithSubject(final LogMessage operationalLogMessage)
{
getEventLogger().message(_logSubject, operationalLogMessage);
}
@Override
public Object getConnectionReference()
{
return getConnection().getReference();
}
@Override
public int getUnacknowledgedMessageCount()
{
return _unacknowledgedMessages;
}
@Override
public long getTxnStart()
{
return _startedTransactions;
}
@Override
public long getTxnCommits()
{
return _committedTransactions;
}
@Override
public long getTxnRejects()
{
return _rolledBackTransactions;
}
@Override
public long getConsumerCount()
{
// TODO - fix statistic - need to count consumers
return -1;
}
@Override
public String toLogString()
{
final AMQPConnection<?> amqpConnection = getAMQPConnection();
long connectionId = amqpConnection.getConnectionId();
String remoteAddress = amqpConnection.getRemoteAddressString();
final String authorizedPrincipal = amqpConnection.getAuthorizedPrincipal() == null ? "?" : amqpConnection.getAuthorizedPrincipal().getName();
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
connectionId,
authorizedPrincipal,
remoteAddress,
getAddressSpace().getName(),
_sendingChannel) + "] ";
}
public AMQPConnection_1_0<?> getConnection()
{
return _connection;
}
@Override
public void addDeleteTask(final Action<? super Session_1_0> task)
{
// TODO is the closed guard important?
if(!_closed.get())
{
super.addDeleteTask(task);
}
}
public Subject getSubject()
{
return _subject;
}
private NamedAddressSpace getAddressSpace()
{
return _connection.getAddressSpace();
}
public SecurityToken getSecurityToken()
{
return _token;
}
@Override
public Collection<Consumer<?, ConsumerTarget_1_0>> getConsumers()
{
return Collections.unmodifiableCollection(_consumers);
}
@Override
public long getTransactionStartTimeLong()
{
return 0L;
}
@Override
public long getTransactionUpdateTimeLong()
{
return 0L;
}
@Override
protected void updateBlockedStateIfNecessary()
{
}
@Override
public boolean isClosing()
{
return END_STATES.contains(getSessionState());
}
@Override
public void doTimeoutAction(final String reason)
{
getAMQPConnection().closeSessionAsync(this, AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
}
void incrementStartedTransactions()
{
_startedTransactions++;
}
void incrementCommittedTransactions()
{
_committedTransactions++;
}
void incrementRolledBackTransactions()
{
_rolledBackTransactions++;
}
void incrementUnacknowledged()
{
_unacknowledgedMessages++;
}
void decrementUnacknowledged()
{
_unacknowledgedMessages--;
}
@Override
public String toString()
{
return "Session_1_0[" + _connection + ": " + _sendingChannel + ']';
}
public void dissociateEndpoint(LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint)
{
for (Map.Entry<UnsignedInteger, LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> entry : _inputHandleToEndpoint.entrySet())
{
if (entry.getValue() == linkEndpoint)
{
_inputHandleToEndpoint.remove(entry.getKey());
break;
}
}
_endpointToOutputHandle.remove(linkEndpoint);
_associatedLinkEndpoints.remove(linkEndpoint);
}
private void detach(UnsignedInteger handle, Detach detach)
{
if(_inputHandleToEndpoint.containsKey(handle))
{
LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint = _inputHandleToEndpoint.remove(handle);
endpoint.remoteDetached(detach);
_endpointToOutputHandle.remove(endpoint);
}
else
{
// TODO
}
}
private void detachLinks()
{
Collection<UnsignedInteger> handles = new ArrayList<>(_inputHandleToEndpoint.keySet());
for(UnsignedInteger handle : handles)
{
Detach detach = new Detach();
detach.setClosed(false);
detach.setHandle(handle);
detach.setError(_sessionEndedLinkError);
detach(handle, detach);
}
}
private UnsignedInteger findNextAvailableOutputHandle()
{
int i = 0;
do
{
if(!_endpointToOutputHandle.containsValue(UnsignedInteger.valueOf(i)))
{
return UnsignedInteger.valueOf(i);
}
}
while(++i != 0);
// TODO
throw new RuntimeException();
}
private Exchange<?> getExchange(String name)
{
MessageDestination destination = getAddressSpace().getAttainedMessageDestination(name);
return destination instanceof Exchange ? (Exchange<?>) destination : null;
}
private Queue<?> getQueue(String name)
{
MessageSource source = getAddressSpace().getAttainedMessageSource(name);
return source instanceof Queue ? (Queue<?>) source : null;
}
private String getPrimaryDomain()
{
String primaryDomain = "";
final List<String> globalAddressDomains = getAddressSpace().getGlobalAddressDomains();
if (globalAddressDomains != null && !globalAddressDomains.isEmpty())
{
primaryDomain = globalAddressDomains.get(0);
if(primaryDomain != null)
{
primaryDomain = primaryDomain.trim();
if(!primaryDomain.endsWith("/"))
{
primaryDomain += "/";
}
}
}
return primaryDomain;
}
private class EndpointCreationCallback<T extends LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> implements FutureCallback<T>
{
private final Attach _attach;
EndpointCreationCallback(final Attach attach)
{
_attach = attach;
}
@Override
public void onSuccess(final T endpoint)
{
doOnIOThreadAsync(new Runnable()
{
@Override
public void run()
{
_associatedLinkEndpoints.add(endpoint);
endpoint.setLocalHandle(findNextAvailableOutputHandle());
if (endpoint instanceof ErrantLinkEndpoint)
{
endpoint.sendAttach();
((ErrantLinkEndpoint) endpoint).closeWithError();
}
else
{
if (endpoint instanceof StandardReceivingLinkEndpoint
&& (_blockingEntities.contains(Session_1_0.this)
|| _blockingEntities.contains(((StandardReceivingLinkEndpoint) endpoint).getReceivingDestination())))
{
endpoint.setStopped(true);
}
_inputHandleToEndpoint.put(_attach.getHandle(), endpoint);
if (!_endpointToOutputHandle.containsKey(endpoint))
{
_endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle());
endpoint.sendAttach();
endpoint.start();
}
else
{
// TODO - link stealing???
}
}
}
});
}
@Override
public void onFailure(final Throwable t)
{
String errorMessage = String.format("Failed to create LinkEndpoint in response to Attach: %s", _attach);
_logger.error(errorMessage, t);
throw new ConnectionScopedRuntimeException(errorMessage, t);
}
}
private final class BindingInfo
{
private final Map<Symbol, Filter> _actualFilters = new HashMap<>();
private final Map<String, Map<String, Object>> _bindings = new HashMap<>();
private BindingInfo(Exchange<?> exchange,
final String queueName,
String bindingKey,
Map<Symbol, Filter> filters) throws AmqpErrorException
{
String binding = null;
final Map<String, Object> arguments = new HashMap<>();
if (filters != null && !filters.isEmpty())
{
boolean hasBindingFilter = false;
boolean hasMessageFilter = false;
for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
{
if(!hasBindingFilter
&& entry.getValue() instanceof ExactSubjectFilter
&& exchange.getType().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
ExactSubjectFilter filter = (ExactSubjectFilter) entry.getValue();
binding = filter.getValue();
_actualFilters.put(entry.getKey(), filter);
hasBindingFilter = true;
}
else if(!hasBindingFilter
&& entry.getValue() instanceof MatchingSubjectFilter
&& exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
MatchingSubjectFilter filter = (MatchingSubjectFilter) entry.getValue();
binding = filter.getValue();
_actualFilters.put(entry.getKey(), filter);
hasBindingFilter = true;
}
else if(entry.getValue() instanceof NoLocalFilter)
{
_actualFilters.put(entry.getKey(), entry.getValue());
arguments.put(AMQPFilterTypes.NO_LOCAL.toString(), true);
}
else if (!hasMessageFilter
&& entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
{
org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter =
(org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
// TODO: QPID-7642 - due to inconsistent handling of invalid filters
// by different exchange implementations
// we need to validate filter before creation of binding
try
{
new org.apache.qpid.server.filter.JMSSelectorFilter(selectorFilter.getValue());
}
catch (ParseException | SelectorParsingException | TokenMgrError e)
{
Error error = new Error();
error.setCondition(AmqpError.INVALID_FIELD);
error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
throw new AmqpErrorException(error);
}
arguments.put(AMQPFilterTypes.JMS_SELECTOR.toString(), selectorFilter.getValue());
_actualFilters.put(entry.getKey(), selectorFilter);
hasMessageFilter = true;
}
}
}
if(binding != null)
{
_bindings.put(binding, arguments);
}
if(bindingKey != null)
{
_bindings.put(bindingKey, arguments);
}
if(binding == null
&& bindingKey == null
&& exchange.getType().equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS))
{
_bindings.put(queueName, arguments);
}
else if(binding == null
&& bindingKey == null
&& exchange.getType().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
_bindings.put("#", arguments);
}
}
private Map<Symbol, Filter> getActualFilters()
{
return _actualFilters;
}
private Map<String, Map<String, Object>> getBindings()
{
return _bindings;
}
@Override
public boolean equals(final Object o)
{
if (this == o)
{
return true;
}
if (o == null || getClass() != o.getClass())
{
return false;
}
final BindingInfo that = (BindingInfo) o;
return _actualFilters.equals(that._actualFilters) && _bindings.equals(that._bindings);
}
@Override
public int hashCode()
{
int result = _actualFilters.hashCode();
result = 31 * result + _bindings.hashCode();
return result;
}
}
}