| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.protocol.v0_8; |
| |
| import static org.apache.qpid.transport.util.Functions.hex; |
| |
| import java.security.AccessControlContext; |
| import java.security.AccessControlException; |
| import java.security.AccessController; |
| import java.security.PrivilegedAction; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import javax.security.auth.Subject; |
| |
| import com.google.common.util.concurrent.ListenableFuture; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.AMQConnectionException; |
| import org.apache.qpid.bytebuffer.QpidByteBuffer; |
| import org.apache.qpid.common.AMQPFilterTypes; |
| import org.apache.qpid.exchange.ExchangeDefaults; |
| import org.apache.qpid.framing.*; |
| import org.apache.qpid.protocol.AMQConstant; |
| import org.apache.qpid.server.connection.SessionPrincipal; |
| import org.apache.qpid.server.consumer.ConsumerImpl; |
| import org.apache.qpid.server.consumer.ConsumerTarget; |
| import org.apache.qpid.server.filter.AMQInvalidArgumentException; |
| import org.apache.qpid.server.filter.ArrivalTimeFilter; |
| import org.apache.qpid.server.filter.FilterManager; |
| import org.apache.qpid.server.filter.FilterManagerFactory; |
| import org.apache.qpid.server.filter.Filterable; |
| import org.apache.qpid.server.filter.MessageFilter; |
| import org.apache.qpid.server.flow.FlowCreditManager; |
| import org.apache.qpid.server.logging.EventLogger; |
| import org.apache.qpid.server.logging.EventLoggerProvider; |
| import org.apache.qpid.server.logging.LogMessage; |
| import org.apache.qpid.server.logging.LogSubject; |
| import org.apache.qpid.server.logging.messages.ChannelMessages; |
| import org.apache.qpid.server.logging.messages.ExchangeMessages; |
| import org.apache.qpid.server.logging.subjects.ChannelLogSubject; |
| import org.apache.qpid.server.message.InstanceProperties; |
| import org.apache.qpid.server.message.MessageDestination; |
| import org.apache.qpid.server.message.MessageInstance; |
| import org.apache.qpid.server.message.MessageReference; |
| import org.apache.qpid.server.message.MessageSource; |
| import org.apache.qpid.server.message.ServerMessage; |
| import org.apache.qpid.server.model.Broker; |
| import org.apache.qpid.server.model.ConfigurationChangeListener; |
| import org.apache.qpid.server.model.ConfiguredObject; |
| import org.apache.qpid.server.model.Connection; |
| import org.apache.qpid.server.model.Consumer; |
| import org.apache.qpid.server.model.Exchange; |
| import org.apache.qpid.server.model.ExclusivityPolicy; |
| import org.apache.qpid.server.model.LifetimePolicy; |
| import org.apache.qpid.server.model.NamedAddressSpace; |
| import org.apache.qpid.server.model.NoFactoryForTypeException; |
| import org.apache.qpid.server.model.Queue; |
| import org.apache.qpid.server.model.Session; |
| import org.apache.qpid.server.model.State; |
| import org.apache.qpid.server.model.UnknownConfiguredObjectException; |
| import org.apache.qpid.server.protocol.AMQSessionModel; |
| import org.apache.qpid.server.protocol.CapacityChecker; |
| import org.apache.qpid.server.protocol.ConsumerListener; |
| import org.apache.qpid.server.protocol.PublishAuthorisationCache; |
| import org.apache.qpid.server.queue.QueueArgumentsConverter; |
| import org.apache.qpid.server.security.SecurityToken; |
| import org.apache.qpid.server.store.MessageHandle; |
| import org.apache.qpid.server.store.MessageStore; |
| import org.apache.qpid.server.store.StoredMessage; |
| import org.apache.qpid.server.store.TransactionLogResource; |
| import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; |
| import org.apache.qpid.server.txn.LocalTransaction; |
| import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; |
| import org.apache.qpid.server.txn.ServerTransaction; |
| import org.apache.qpid.server.util.Action; |
| import org.apache.qpid.server.util.ServerScopedRuntimeException; |
| import org.apache.qpid.server.virtualhost.ExchangeExistsException; |
| import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; |
| import org.apache.qpid.server.virtualhost.QueueExistsException; |
| import org.apache.qpid.server.virtualhost.RequiredExchangeException; |
| import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; |
| import org.apache.qpid.transport.network.Ticker; |
| |
| public class AMQChannel |
| implements AMQSessionModel<AMQChannel>, |
| AsyncAutoCommitTransaction.FutureRecorder, |
| ServerChannelMethodProcessor, |
| EventLoggerProvider |
| { |
| public static final int DEFAULT_PREFETCH = 4096; |
| |
| private static final Logger _logger = LoggerFactory.getLogger(AMQChannel.class); |
| private final DefaultQueueAssociationClearingTask |
| _defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask(); |
| |
| private final int _channelId; |
| |
| |
| private final Pre0_10CreditManager _creditManager; |
| private final FlowCreditManager _noAckCreditManager; |
| private final AccessControlContext _accessControllerContext; |
| private final SecurityToken _token; |
| |
| private final PublishAuthorisationCache _publishAuthCahe; |
| |
| |
| |
| /** |
| * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that |
| * value of this represents the <b>last</b> tag sent out |
| */ |
| private long _deliveryTag = 0; |
| |
| /** A channel has a default queue (the last declared) that is used when no queue name is explicitly set */ |
| private volatile Queue<?> _defaultQueue; |
| |
| /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ |
| private int _consumerTag; |
| |
| /** |
| * The current message - which may be partial in the sense that not all frames have been received yet - which has |
| * been received by this channel. As the frames are received the message gets updated and once all frames have been |
| * received the message can then be routed. |
| */ |
| private IncomingMessage _currentMessage; |
| |
| /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ |
| private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>(); |
| |
| private final List<ConsumerTarget_0_8> _consumersWithPendingWork = new ArrayList<>(); |
| |
| private final MessageStore _messageStore; |
| |
| private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>(); |
| |
| private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); |
| |
| private final AtomicBoolean _suspended = new AtomicBoolean(false); |
| |
| private ServerTransaction _transaction; |
| |
| private final AtomicLong _txnStarts = new AtomicLong(0); |
| private final AtomicLong _txnCommits = new AtomicLong(0); |
| private final AtomicLong _txnRejects = new AtomicLong(0); |
| private final AtomicLong _txnCount = new AtomicLong(0); |
| |
| private final AMQPConnection_0_8 _connection; |
| private AtomicBoolean _closing = new AtomicBoolean(false); |
| |
| private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); |
| |
| private final AtomicBoolean _blocking = new AtomicBoolean(false); |
| |
| |
| private LogSubject _logSubject; |
| private volatile boolean _rollingBack; |
| |
| private List<MessageInstance> _resendList = new ArrayList<MessageInstance>(); |
| private static final |
| AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); |
| |
| private final ClientDeliveryMethod _clientDeliveryMethod; |
| |
| private final UUID _id = UUID.randomUUID(); |
| |
| private final List<Action<? super AMQChannel>> _taskList = |
| new CopyOnWriteArrayList<Action<? super AMQChannel>>(); |
| |
| |
| private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); |
| private final ImmediateAction _immediateAction = new ImmediateAction(); |
| private final Subject _subject; |
| private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); |
| private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); |
| private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); |
| private Session<?> _modelObject; |
| private long _blockTime; |
| private long _blockingTimeout; |
| private boolean _confirmOnPublish; |
| private long _confirmedMessageCounter; |
| private volatile long _uncommittedMessageSize; |
| private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>(); |
| private long _maxUncommittedInMemorySize; |
| |
| private boolean _wireBlockingState; |
| |
| /** Flag recording if this channel has already written operational logging for prefetch size */ |
| private boolean _prefetchLoggedForChannel = false; |
| |
| /** |
| * Handles special case where consumer is polling for messages using qos/flow. Avoids the per-message |
| * production of channel flow and prefetch operational logging. |
| */ |
| private boolean _logChannelFlowMessages = true; |
| |
| private final CachedFrame _txCommitOkFrame; |
| |
| public AMQChannel(AMQPConnection_0_8 connection, int channelId, final MessageStore messageStore) |
| { |
| _creditManager = new Pre0_10CreditManager(0l,0l, connection); |
| _noAckCreditManager = new NoAckCreditManager(connection); |
| |
| _connection = connection; |
| _channelId = channelId; |
| |
| _subject = new Subject(false, connection.getSubject().getPrincipals(), |
| connection.getSubject().getPublicCredentials(), |
| connection.getSubject().getPrivateCredentials()); |
| _subject.getPrincipals().add(new SessionPrincipal(this)); |
| |
| _accessControllerContext = connection.getAccessControlContextFromSubject(_subject); |
| _token = (_connection.getAddressSpace() instanceof ConfiguredObject) |
| ? ((ConfiguredObject)_connection.getAddressSpace()).newToken(_subject) |
| :_connection.getBroker().newToken(_subject); |
| |
| _maxUncommittedInMemorySize = connection.getContextProvider().getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE); |
| _logSubject = new ChannelLogSubject(this); |
| _publishAuthCahe = new PublishAuthorisationCache(_token, |
| connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT), |
| connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)); |
| _messageStore = messageStore; |
| _blockingTimeout = connection.getBroker().getContextValue(Long.class, |
| Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT); |
| // by default the session is non-transactional |
| _transaction = new AsyncAutoCommitTransaction(_messageStore, this); |
| |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody(); |
| _txCommitOkFrame = new CachedFrame(responseBody.generateFrame(_channelId)); |
| |
| _clientDeliveryMethod = connection.createDeliveryMethod(_channelId); |
| |
| AccessController.doPrivileged((new PrivilegedAction<Object>() |
| { |
| @Override |
| public Object run() |
| { |
| message(ChannelMessages.CREATE()); |
| |
| return null; |
| } |
| }),_accessControllerContext); |
| |
| } |
| |
| @Override |
| public void doTimeoutAction(String reason) |
| { |
| _connection.sendConnectionCloseAsync(AMQConstant.RESOURCE_ERROR, reason); |
| } |
| |
| private void message(final LogMessage message) |
| { |
| getEventLogger().message(message); |
| } |
| |
| public AccessControlContext getAccessControllerContext() |
| { |
| return _accessControllerContext; |
| } |
| |
| private boolean performGet(final MessageSource queue, |
| final boolean acks) |
| throws MessageSource.ExistingConsumerPreventsExclusive, |
| MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused |
| { |
| |
| final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); |
| |
| final GetDeliveryMethod getDeliveryMethod = |
| new GetDeliveryMethod(singleMessageCredit, queue); |
| final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() |
| { |
| |
| public void recordMessageDelivery(final ConsumerImpl sub, |
| final MessageInstance entry, |
| final long deliveryTag) |
| { |
| addUnacknowledgedMessage(entry, deliveryTag, null); |
| } |
| }; |
| |
| ConsumerTarget_0_8 target; |
| EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, |
| ConsumerImpl.Option.SEES_REQUEUES); |
| if (acks) |
| { |
| |
| target = ConsumerTarget_0_8.createAckTarget(this, |
| AMQShortString.EMPTY_STRING, null, |
| singleMessageCredit, getDeliveryMethod, getRecordMethod); |
| } |
| else |
| { |
| target = ConsumerTarget_0_8.createGetNoAckTarget(this, |
| AMQShortString.EMPTY_STRING, null, |
| singleMessageCredit, getDeliveryMethod, getRecordMethod); |
| } |
| |
| ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null); |
| sub.flush(); |
| sub.close(); |
| return getDeliveryMethod.hasDeliveredMessage(); |
| |
| |
| } |
| |
| /** Sets this channel to be part of a local transaction */ |
| public void setLocalTransactional() |
| { |
| _transaction = new LocalTransaction(_messageStore, new ActivityTimeAccessor() |
| { |
| @Override |
| public long getActivityTime() |
| { |
| return _connection.getLastReadTime(); |
| } |
| }); |
| _txnStarts.incrementAndGet(); |
| } |
| |
| public boolean isTransactional() |
| { |
| return _transaction.isTransactional(); |
| } |
| |
| public void receivedComplete() |
| { |
| sync(); |
| } |
| |
| private void incrementOutstandingTxnsIfNecessary() |
| { |
| if(isTransactional()) |
| { |
| //There can currently only be at most one outstanding transaction |
| //due to only having LocalTransaction support. Set value to 1 if 0. |
| _txnCount.compareAndSet(0,1); |
| } |
| } |
| |
| private void decrementOutstandingTxnsIfNecessary() |
| { |
| if(isTransactional()) |
| { |
| //There can currently only be at most one outstanding transaction |
| //due to only having LocalTransaction support. Set value to 0 if 1. |
| _txnCount.compareAndSet(1,0); |
| } |
| } |
| |
| public Long getTxnCommits() |
| { |
| return _txnCommits.get(); |
| } |
| |
| public Long getTxnRejects() |
| { |
| return _txnRejects.get(); |
| } |
| |
| public Long getTxnCount() |
| { |
| return _txnCount.get(); |
| } |
| |
| public Long getTxnStart() |
| { |
| return _txnStarts.get(); |
| } |
| |
| public int getChannelId() |
| { |
| return _channelId; |
| } |
| |
| public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) |
| { |
| _currentMessage = new IncomingMessage(info); |
| _currentMessage.setMessageDestination(e); |
| } |
| |
| public void publishContentHeader(ContentHeaderBody contentHeaderBody) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Content header received on channel " + _channelId); |
| } |
| |
| _currentMessage.setContentHeaderBody(contentHeaderBody); |
| |
| deliverCurrentMessageIfComplete(); |
| } |
| |
| private void deliverCurrentMessageIfComplete() |
| { |
| // check and deliver if header says body length is zero |
| if (_currentMessage.allContentReceived()) |
| { |
| MessagePublishInfo info = _currentMessage.getMessagePublishInfo(); |
| String routingKey = AMQShortString.toString(info.getRoutingKey()); |
| |
| try |
| { |
| final MessageDestination destination = _currentMessage.getDestination(); |
| |
| ContentHeaderBody contentHeader = _currentMessage.getContentHeader(); |
| _connection.checkAuthorizedMessagePrincipal(AMQShortString.toString(contentHeader.getProperties().getUserId())); |
| |
| _publishAuthCahe.authorisePublish(destination, routingKey, info.isImmediate(), _connection.getLastReadTime()); |
| |
| if (_confirmOnPublish) |
| { |
| _confirmedMessageCounter++; |
| } |
| Runnable finallyAction = null; |
| |
| long bodySize = _currentMessage.getSize(); |
| long timestamp = contentHeader.getProperties().getTimestamp(); |
| |
| try |
| { |
| |
| final MessagePublishInfo messagePublishInfo = _currentMessage.getMessagePublishInfo(); |
| |
| final MessageMetaData messageMetaData = |
| new MessageMetaData(messagePublishInfo, |
| contentHeader, |
| getConnection().getLastReadTime()); |
| |
| final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData); |
| int bodyCount = _currentMessage.getBodyCount(); |
| if (bodyCount > 0) |
| { |
| for (int i = 0; i < bodyCount; i++) |
| { |
| ContentBody contentChunk = _currentMessage.getContentChunk(i); |
| handle.addContent(contentChunk.getPayload()); |
| contentChunk.dispose(); |
| } |
| } |
| final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded(); |
| |
| final AMQMessage amqMessage = createAMQMessage(storedMessage); |
| MessageReference reference = amqMessage.newReference(); |
| try |
| { |
| |
| _currentMessage = null; |
| |
| |
| final boolean immediate = messagePublishInfo.isImmediate(); |
| |
| final InstanceProperties instanceProperties = |
| new InstanceProperties() |
| { |
| @Override |
| public Object getProperty(final Property prop) |
| { |
| switch (prop) |
| { |
| case EXPIRATION: |
| return amqMessage.getExpiration(); |
| case IMMEDIATE: |
| return immediate; |
| case PERSISTENT: |
| return amqMessage.isPersistent(); |
| case MANDATORY: |
| return messagePublishInfo.isMandatory(); |
| case REDELIVERED: |
| return false; |
| } |
| return null; |
| } |
| }; |
| |
| int enqueues = destination.send(amqMessage, |
| amqMessage.getInitialRoutingAddress(), |
| instanceProperties, _transaction, |
| immediate ? _immediateAction : _capacityCheckAction |
| ); |
| if (enqueues == 0) |
| { |
| finallyAction = handleUnroutableMessage(amqMessage); |
| } |
| else |
| { |
| if (_confirmOnPublish) |
| { |
| BasicAckBody responseBody = _connection.getMethodRegistry() |
| .createBasicAckBody(_confirmedMessageCounter, false); |
| _connection.writeFrame(responseBody.generateFrame(_channelId)); |
| } |
| incrementUncommittedMessageSize(storedMessage); |
| incrementOutstandingTxnsIfNecessary(); |
| } |
| |
| } |
| finally |
| { |
| reference.release(); |
| if (finallyAction != null) |
| { |
| finallyAction.run(); |
| } |
| } |
| |
| } |
| finally |
| { |
| _connection.registerMessageReceived(bodySize, timestamp); |
| _currentMessage = null; |
| } |
| } |
| catch (AccessControlException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); |
| } |
| |
| } |
| |
| } |
| |
| private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData> handle) |
| { |
| if (isTransactional()) |
| { |
| _uncommittedMessageSize += handle.getMetaData().getContentSize(); |
| if (_uncommittedMessageSize > getMaxUncommittedInMemorySize()) |
| { |
| handle.flowToDisk(); |
| if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize()) |
| { |
| messageWithSubject(ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize)); |
| } |
| |
| if(!_uncommittedMessages.isEmpty()) |
| { |
| for (StoredMessage<MessageMetaData> uncommittedHandle : _uncommittedMessages) |
| { |
| uncommittedHandle.flowToDisk(); |
| } |
| _uncommittedMessages.clear(); |
| } |
| } |
| else |
| { |
| _uncommittedMessages.add(handle); |
| } |
| } |
| } |
| |
| /** |
| * Either throws a {@link AMQConnectionException} or returns the message |
| * |
| * Pre-requisite: the current message is judged to have no destination queues. |
| * |
| * @throws AMQConnectionException if the message is mandatory close-on-no-route |
| * @see AMQPConnection_0_8Impl#isCloseWhenNoRoute() |
| */ |
| private Runnable handleUnroutableMessage(AMQMessage message) |
| { |
| boolean mandatory = message.isMandatory(); |
| |
| String exchangeName = AMQShortString.toString(message.getMessagePublishInfo().getExchange()); |
| String routingKey = AMQShortString.toString(message.getMessagePublishInfo().getRoutingKey()); |
| |
| final String description = String.format( |
| "[Exchange: %s, Routing key: %s]", |
| exchangeName, |
| routingKey); |
| |
| boolean closeOnNoRoute = _connection.isCloseWhenNoRoute(); |
| Runnable returnVal = null; |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug(String.format( |
| "Unroutable message %s, mandatory=%s, transactionalSession=%s, closeOnNoRoute=%s", |
| description, mandatory, isTransactional(), closeOnNoRoute)); |
| } |
| |
| if (mandatory && isTransactional() && !_confirmOnPublish && _connection.isCloseWhenNoRoute()) |
| { |
| returnVal = new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| _connection.sendConnectionClose(AMQConstant.NO_ROUTE, |
| "No route for message " + description, _channelId); |
| |
| } |
| }; |
| } |
| else |
| { |
| if (mandatory || message.isImmediate()) |
| { |
| if(_confirmOnPublish) |
| { |
| _connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false))); |
| } |
| _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, |
| "No Route for message " |
| + description, |
| message)); |
| } |
| else |
| { |
| |
| message(ExchangeMessages.DISCARDMSG(exchangeName, routingKey)); |
| } |
| } |
| return returnVal; |
| } |
| |
| public void publishContentBody(ContentBody contentBody) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug(debugIdentity() + " content body received on channel " + _channelId); |
| } |
| |
| try |
| { |
| long currentSize = _currentMessage.addContentBodyFrame(contentBody); |
| if(currentSize > _currentMessage.getSize()) |
| { |
| _connection.sendConnectionClose(AMQConstant.FRAME_ERROR, |
| "More message data received than content header defined", |
| _channelId); |
| } |
| else |
| { |
| deliverCurrentMessageIfComplete(); |
| } |
| } |
| catch (RuntimeException e) |
| { |
| // we want to make sure we don't keep a reference to the message in the |
| // event of an error |
| _currentMessage = null; |
| throw e; |
| } |
| } |
| |
| public long getNextDeliveryTag() |
| { |
| return ++_deliveryTag; |
| } |
| |
| public int getNextConsumerTag() |
| { |
| return ++_consumerTag; |
| } |
| |
| |
| public ConsumerTarget getSubscription(AMQShortString tag) |
| { |
| return _tag2SubscriptionTargetMap.get(tag); |
| } |
| |
| /** |
| * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean |
| * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. |
| * |
| * |
| * @param tag the tag chosen by the client (if null, server will generate one) |
| * @param sources the queues to subscribe to |
| * @param acks Are acks enabled for this subscriber |
| * @param arguments Filters to apply to this subscriber |
| * |
| * @param exclusive Flag requesting exclusive access to the queue |
| * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests |
| */ |
| public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks, |
| FieldTable arguments, boolean exclusive, boolean noLocal) |
| throws MessageSource.ExistingConsumerPreventsExclusive, |
| MessageSource.ExistingExclusiveConsumer, |
| AMQInvalidArgumentException, |
| MessageSource.ConsumerAccessRefused, ConsumerTagInUseException |
| { |
| if (tag == null) |
| { |
| tag = new AMQShortString("sgen_" + getNextConsumerTag()); |
| } |
| |
| if (_tag2SubscriptionTargetMap.containsKey(tag)) |
| { |
| throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag); |
| } |
| |
| ConsumerTarget_0_8 target; |
| EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class); |
| final boolean multiQueue = sources.size()>1; |
| if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue()))) |
| { |
| target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager, multiQueue); |
| } |
| else if(acks) |
| { |
| target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager, multiQueue); |
| options.add(ConsumerImpl.Option.ACQUIRES); |
| options.add(ConsumerImpl.Option.SEES_REQUEUES); |
| } |
| else |
| { |
| target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager, multiQueue); |
| options.add(ConsumerImpl.Option.ACQUIRES); |
| options.add(ConsumerImpl.Option.SEES_REQUEUES); |
| } |
| |
| if(exclusive) |
| { |
| options.add(ConsumerImpl.Option.EXCLUSIVE); |
| } |
| |
| |
| // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. |
| // We add before we register as the Async Delivery process may AutoClose the subscriber |
| // so calling _cT2QM.remove before we have done put which was after the register succeeded. |
| // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. |
| |
| _tag2SubscriptionTargetMap.put(tag, target); |
| |
| try |
| { |
| FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments)); |
| if(noLocal) |
| { |
| if(filterManager == null) |
| { |
| filterManager = new FilterManager(); |
| } |
| MessageFilter filter = new NoLocalFilter(); |
| filterManager.add(filter.getName(), filter); |
| } |
| |
| if(arguments != null && arguments.containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString())) |
| { |
| Object value = arguments.get(AMQPFilterTypes.REPLAY_PERIOD.toString()); |
| final long period; |
| if(value instanceof Number) |
| { |
| period = ((Number)value).longValue(); |
| } |
| else if(value instanceof String) |
| { |
| try |
| { |
| period = Long.parseLong(value.toString()); |
| } |
| catch (NumberFormatException e) |
| { |
| throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); |
| } |
| } |
| else |
| { |
| throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); |
| } |
| |
| final long startingFrom = System.currentTimeMillis() - (1000l * period); |
| if(filterManager == null) |
| { |
| filterManager = new FilterManager(); |
| } |
| MessageFilter filter = new ArrivalTimeFilter(startingFrom, period==0); |
| filterManager.add(filter.getName(), filter); |
| |
| } |
| |
| Integer priority = null; |
| if(arguments != null && arguments.containsKey("x-priority")) |
| { |
| Object value = arguments.get("x-priority"); |
| if(value instanceof Number) |
| { |
| priority = ((Number)value).intValue(); |
| } |
| else if(value instanceof String || value instanceof AMQShortString) |
| { |
| try |
| { |
| priority = Integer.parseInt(value.toString()); |
| } |
| catch (NumberFormatException e) |
| { |
| // use default vlaue |
| } |
| } |
| |
| } |
| |
| |
| |
| for(MessageSource source : sources) |
| { |
| ConsumerImpl sub = |
| source.addConsumer(target, |
| filterManager, |
| AMQMessage.class, |
| AMQShortString.toString(tag), |
| options, priority); |
| if (sub instanceof Consumer<?>) |
| { |
| final Consumer<?> modelConsumer = (Consumer<?>) sub; |
| consumerAdded(modelConsumer); |
| modelConsumer.addChangeListener(_consumerClosedListener); |
| _consumers.add(modelConsumer); |
| } |
| } |
| } |
| catch (AccessControlException |
| | MessageSource.ExistingExclusiveConsumer |
| | MessageSource.ExistingConsumerPreventsExclusive |
| | AMQInvalidArgumentException |
| | MessageSource.ConsumerAccessRefused e) |
| { |
| _tag2SubscriptionTargetMap.remove(tag); |
| throw e; |
| } |
| return tag; |
| } |
| |
| /** |
| * Unsubscribe a consumer from a queue. |
| * @param consumerTag |
| * @return true if the consumerTag had a mapped queue that could be unregistered. |
| */ |
| public boolean unsubscribeConsumer(AMQShortString consumerTag) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Unsubscribing consumer '{}' on channel {}", consumerTag, this); |
| } |
| |
| ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); |
| Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers(); |
| if (subs != null) |
| { |
| for(ConsumerImpl sub : subs) |
| { |
| sub.close(); |
| if (sub instanceof Consumer<?>) |
| { |
| _consumers.remove(sub); |
| } |
| } |
| return true; |
| } |
| else |
| { |
| _logger.warn("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered."); |
| } |
| return false; |
| } |
| |
| @Override |
| public void close() |
| { |
| close(null, null); |
| } |
| |
| public void close(AMQConstant cause, String message) |
| { |
| if(!_closing.compareAndSet(false, true)) |
| { |
| //Channel is already closing |
| return; |
| } |
| |
| try |
| { |
| unsubscribeAllConsumers(); |
| setDefaultQueue(null); |
| if(_modelObject != null) |
| { |
| _modelObject.delete(); |
| } |
| for (Action<? super AMQChannel> task : _taskList) |
| { |
| task.performAction(this); |
| } |
| |
| _transaction.rollback(); |
| |
| requeue(); |
| } |
| finally |
| { |
| LogMessage operationalLogMessage = cause == null ? |
| ChannelMessages.CLOSE() : |
| ChannelMessages.CLOSE_FORCED(cause.getCode(), message); |
| messageWithSubject(operationalLogMessage); |
| } |
| } |
| |
| private void messageWithSubject(final LogMessage operationalLogMessage) |
| { |
| getEventLogger().message(_logSubject, operationalLogMessage); |
| } |
| |
| private void unsubscribeAllConsumers() |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| if (!_tag2SubscriptionTargetMap.isEmpty()) |
| { |
| _logger.debug("Unsubscribing all consumers on channel " + toString()); |
| } |
| else |
| { |
| _logger.debug("No consumers to unsubscribe on channel " + toString()); |
| } |
| } |
| |
| Set<AMQShortString> subscriptionTags = new HashSet<>(_tag2SubscriptionTargetMap.keySet()); |
| for (AMQShortString tag : subscriptionTags) |
| { |
| unsubscribeConsumer(tag); |
| } |
| } |
| |
| /** |
| * Add a message to the channel-based list of unacknowledged messages |
| * |
| * @param entry the record of the message on the queue that was delivered |
| * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the |
| * delivery tag) |
| * @param consumer The consumer that is to acknowledge this message. |
| */ |
| public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag |
| + ") for " + consumer + " on " + entry.getOwningResource().getName()); |
| } |
| |
| _unacknowledgedMessageMap.add(deliveryTag, entry); |
| |
| } |
| |
| private final String id = "(" + System.identityHashCode(this) + ")"; |
| |
| public String debugIdentity() |
| { |
| return _channelId + id; |
| } |
| |
| /** |
| * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to |
| * this same channel or to other subscribers. |
| * |
| */ |
| private void requeue() |
| { |
| // we must create a new map since all the messages will get a new delivery tag when they are redelivered |
| Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); |
| |
| if (!messagesToBeDelivered.isEmpty()) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString()); |
| } |
| |
| } |
| |
| for (MessageInstance unacked : messagesToBeDelivered) |
| { |
| // Mark message redelivered |
| unacked.setRedelivered(); |
| |
| // Ensure message is released for redelivery |
| unacked.release(unacked.getAcquiringConsumer()); |
| } |
| |
| } |
| |
| /** |
| * Requeue a single message |
| * |
| * @param deliveryTag The message to requeue |
| * |
| */ |
| public void requeue(long deliveryTag) |
| { |
| MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag); |
| |
| if (unacked != null) |
| { |
| // Mark message redelivered |
| unacked.setRedelivered(); |
| |
| // Ensure message is released for redelivery |
| unacked.release(unacked.getAcquiringConsumer()); |
| } |
| else |
| { |
| _logger.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." |
| + _unacknowledgedMessageMap.size()); |
| |
| } |
| |
| } |
| |
| public boolean isMaxDeliveryCountEnabled(final long deliveryTag) |
| { |
| final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag); |
| if (queueEntry != null) |
| { |
| final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount(); |
| return maximumDeliveryCount > 0; |
| } |
| |
| return false; |
| } |
| |
| public boolean isDeliveredTooManyTimes(final long deliveryTag) |
| { |
| final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag); |
| if (queueEntry != null) |
| { |
| final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount(); |
| final int numDeliveries = queueEntry.getDeliveryCount(); |
| return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Called to resend all outstanding unacknowledged messages to this same channel. |
| * |
| */ |
| private void resend() |
| { |
| |
| |
| final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>(); |
| final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>(); |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("unacked map Size:" + _unacknowledgedMessageMap.size()); |
| } |
| |
| // Process the Unacked-Map. |
| // Marking messages who still have a consumer for to be resent |
| // and those that don't to be requeued. |
| _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, |
| msgToRequeue, |
| msgToResend |
| )); |
| |
| |
| // Process Messages to Resend |
| if (_logger.isDebugEnabled()) |
| { |
| if (!msgToResend.isEmpty()) |
| { |
| _logger.debug("Preparing (" + msgToResend.size() + ") message to resend."); |
| } |
| else |
| { |
| _logger.debug("No message to resend."); |
| } |
| } |
| |
| for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet()) |
| { |
| MessageInstance message = entry.getValue(); |
| long deliveryTag = entry.getKey(); |
| |
| //Amend the delivery counter as the client hasn't seen these messages yet. |
| message.decrementDeliveryCount(); |
| |
| // Without any details from the client about what has been processed we have to mark |
| // all messages in the unacked map as redelivered. |
| message.setRedelivered(); |
| |
| if (!message.resend()) |
| { |
| msgToRequeue.put(deliveryTag, message); |
| } |
| } // for all messages |
| // } else !isSuspend |
| |
| if (_logger.isDebugEnabled()) |
| { |
| if (!msgToRequeue.isEmpty()) |
| { |
| _logger.debug("Preparing (" + msgToRequeue.size() + ") message to requeue"); |
| } |
| } |
| |
| // Process Messages to Requeue at the front of the queue |
| for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet()) |
| { |
| MessageInstance message = entry.getValue(); |
| long deliveryTag = entry.getKey(); |
| |
| //Amend the delivery counter as the client hasn't seen these messages yet. |
| message.decrementDeliveryCount(); |
| |
| _unacknowledgedMessageMap.remove(deliveryTag); |
| |
| message.setRedelivered(); |
| message.release(message.getAcquiringConsumer()); |
| |
| } |
| } |
| |
| |
| /** |
| * Acknowledge one or more messages. |
| * |
| * @param deliveryTag the last delivery tag |
| * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only |
| * acknowledges the single message specified by the delivery tag |
| * |
| */ |
| private void acknowledgeMessage(long deliveryTag, boolean multiple) |
| { |
| Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple); |
| _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); |
| } |
| |
| private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple) |
| { |
| |
| return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple); |
| |
| } |
| |
| /** |
| * Used only for testing purposes. |
| * |
| * @return the map of unacknowledged messages |
| */ |
| public UnacknowledgedMessageMap getUnacknowledgedMessageMap() |
| { |
| return _unacknowledgedMessageMap; |
| } |
| |
| /** |
| * Called from the ChannelFlowHandler to suspend this Channel |
| * @param suspended boolean, should this Channel be suspended |
| */ |
| public void setSuspended(boolean suspended) |
| { |
| boolean wasSuspended = _suspended.getAndSet(suspended); |
| if (wasSuspended != suspended) |
| { |
| // Log Flow Started before we start the subscriptions |
| if (!suspended && _logChannelFlowMessages) |
| { |
| messageWithSubject(ChannelMessages.FLOW("Started")); |
| } |
| |
| |
| // This section takes two different approaches to perform to perform |
| // the same function. Ensuring that the Subscription has taken note |
| // of the change in Channel State |
| |
| // Here we have become unsuspended and so we ask each the queue to |
| // perform an Async delivery for each of the subscriptions in this |
| // Channel. The alternative would be to ensure that the subscription |
| // had received the change in suspension state. That way the logic |
| // behind deciding to start an async delivery was located with the |
| // Subscription. |
| if (wasSuspended) |
| { |
| // may need to deliver queued messages |
| for (ConsumerTarget_0_8 s : getConsumerTargets()) |
| { |
| for(ConsumerImpl sub : s.getConsumers()) |
| { |
| sub.externalStateChange(); |
| } |
| } |
| } |
| |
| |
| // Here we have become suspended so we need to ensure that each of |
| // the Subscriptions has noticed this change so that we can be sure |
| // they are not still sending messages. Again the code here is a |
| // very simplistic approach to ensure that the change of suspension |
| // has been noticed by each of the Subscriptions. Unlike the above |
| // case we don't actually need to do anything else. |
| if (!wasSuspended) |
| { |
| // may need to deliver queued messages |
| ensureConsumersNoticedStateChange(); |
| } |
| |
| |
| // Log Suspension only after we have confirmed all suspensions are |
| // stopped. |
| if (suspended && _logChannelFlowMessages) |
| { |
| messageWithSubject(ChannelMessages.FLOW("Stopped")); |
| } |
| |
| } |
| } |
| |
| public boolean isSuspended() |
| { |
| return _suspended.get() || _closing.get() || _connection.isClosing(); |
| } |
| |
| |
| public void commit(final Runnable immediateAction, boolean async) |
| { |
| |
| |
| if(async && _transaction instanceof LocalTransaction) |
| { |
| |
| ((LocalTransaction)_transaction).commitAsync(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| try |
| { |
| immediateAction.run(); |
| } |
| finally |
| { |
| _txnCommits.incrementAndGet(); |
| _txnStarts.incrementAndGet(); |
| decrementOutstandingTxnsIfNecessary(); |
| } |
| } |
| }); |
| } |
| else |
| { |
| _transaction.commit(immediateAction); |
| |
| _txnCommits.incrementAndGet(); |
| _txnStarts.incrementAndGet(); |
| decrementOutstandingTxnsIfNecessary(); |
| } |
| resetUncommittedMessages(); |
| } |
| |
| private void resetUncommittedMessages() |
| { |
| _uncommittedMessageSize = 0l; |
| _uncommittedMessages.clear(); |
| } |
| |
| private void rollback(Runnable postRollbackTask) |
| { |
| |
| // stop all subscriptions |
| _rollingBack = true; |
| boolean requiresSuspend = _suspended.compareAndSet(false,true); // TODO This is probably superfluous owing to the |
| // message assignment suspended logic in NBC. |
| |
| // ensure all subscriptions have seen the change to the channel state |
| ensureConsumersNoticedStateChange(); |
| |
| try |
| { |
| _transaction.rollback(); |
| } |
| finally |
| { |
| _rollingBack = false; |
| |
| _txnRejects.incrementAndGet(); |
| _txnStarts.incrementAndGet(); |
| decrementOutstandingTxnsIfNecessary(); |
| resetUncommittedMessages(); |
| } |
| |
| postRollbackTask.run(); |
| |
| for(MessageInstance entry : _resendList) |
| { |
| ConsumerImpl sub = entry.getAcquiringConsumer(); |
| if (sub == null || sub.isClosed()) |
| { |
| entry.release(sub); |
| } |
| else |
| { |
| entry.resend(); |
| } |
| } |
| _resendList.clear(); |
| |
| if(requiresSuspend) |
| { |
| _suspended.set(false); |
| for(ConsumerTarget_0_8 target : getConsumerTargets()) |
| { |
| for(ConsumerImpl sub : target.getConsumers()) |
| { |
| sub.externalStateChange(); |
| } |
| } |
| |
| } |
| } |
| |
| public String toString() |
| { |
| return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]"; |
| } |
| |
| public boolean isClosing() |
| { |
| return _closing.get(); |
| } |
| |
| public AMQPConnection_0_8 getConnection() |
| { |
| return _connection; |
| } |
| |
| public void setCredit(final long prefetchSize, final int prefetchCount) |
| { |
| if (!_prefetchLoggedForChannel) |
| { |
| message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount)); |
| _prefetchLoggedForChannel = true; |
| } |
| |
| if (prefetchCount <= 1 && prefetchSize == 0 ) |
| { |
| _logChannelFlowMessages = false; |
| } |
| _creditManager.setCreditLimits(prefetchSize, prefetchCount); |
| } |
| |
| public MessageStore getMessageStore() |
| { |
| return _messageStore; |
| } |
| |
| public ClientDeliveryMethod getClientDeliveryMethod() |
| { |
| return _clientDeliveryMethod; |
| } |
| |
| private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() |
| { |
| |
| public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) |
| { |
| addUnacknowledgedMessage(entry, deliveryTag, sub); |
| } |
| }; |
| |
| public RecordDeliveryMethod getRecordDeliveryMethod() |
| { |
| return _recordDeliveryMethod; |
| } |
| |
| |
| private AMQMessage createAMQMessage(StoredMessage<MessageMetaData> handle) |
| { |
| |
| AMQMessage message = new AMQMessage(handle, _connection.getReference()); |
| |
| return message; |
| } |
| |
| @Override |
| public UUID getId() |
| { |
| return _id; |
| } |
| |
| @Override |
| public AMQPConnection_0_8 getAMQPConnection() |
| { |
| return _connection; |
| } |
| |
| public LogSubject getLogSubject() |
| { |
| return _logSubject; |
| } |
| |
| @Override |
| public int compareTo(AMQSessionModel o) |
| { |
| return getId().compareTo(o.getId()); |
| } |
| |
| @Override |
| public void addDeleteTask(final Action<? super AMQChannel> task) |
| { |
| _taskList.add(task); |
| } |
| |
| @Override |
| public void removeDeleteTask(final Action<? super AMQChannel> task) |
| { |
| _taskList.remove(task); |
| } |
| |
| public Subject getSubject() |
| { |
| return _subject; |
| } |
| |
| public boolean hasCurrentMessage() |
| { |
| return _currentMessage != null; |
| } |
| |
| public long getMaxUncommittedInMemorySize() |
| { |
| return _maxUncommittedInMemorySize; |
| } |
| |
| private class NoLocalFilter implements MessageFilter |
| { |
| |
| private final Object _connectionReference; |
| |
| public NoLocalFilter() |
| { |
| _connectionReference = getConnectionReference(); |
| } |
| |
| @Override |
| public String getName() |
| { |
| return AMQPFilterTypes.NO_LOCAL.toString(); |
| } |
| |
| @Override |
| public boolean matches(final Filterable message) |
| { |
| return message.getConnectionReference() != _connectionReference; |
| } |
| |
| @Override |
| public boolean startAtTail() |
| { |
| return false; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "NoLocalFilter[]"; |
| } |
| } |
| |
| private class GetDeliveryMethod implements ClientDeliveryMethod |
| { |
| |
| private final FlowCreditManager _singleMessageCredit; |
| private final MessageSource _queue; |
| private boolean _deliveredMessage; |
| |
| public GetDeliveryMethod(final FlowCreditManager singleMessageCredit, |
| final MessageSource queue) |
| { |
| _singleMessageCredit = singleMessageCredit; |
| _queue = queue; |
| } |
| |
| @Override |
| public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, |
| final InstanceProperties props, final long deliveryTag) |
| { |
| _singleMessageCredit.useCreditForMessage(message.getSize()); |
| int queueSize = _queue instanceof Queue ? ((Queue<?>)_queue).getQueueDepthMessages() : 0; |
| long size = _connection.getProtocolOutputConverter().writeGetOk(message, |
| props, |
| AMQChannel.this.getChannelId(), |
| deliveryTag, |
| queueSize); |
| |
| _deliveredMessage = true; |
| return size; |
| } |
| |
| public boolean hasDeliveredMessage() |
| { |
| return _deliveredMessage; |
| } |
| } |
| |
| |
| private class ImmediateAction implements Action<MessageInstance> |
| { |
| |
| public ImmediateAction() |
| { |
| } |
| |
| public void performAction(MessageInstance entry) |
| { |
| TransactionLogResource queue = entry.getOwningResource(); |
| |
| if (!entry.getDeliveredToConsumer() && entry.acquire()) |
| { |
| |
| ServerTransaction txn = new LocalTransaction(_messageStore); |
| final AMQMessage message = (AMQMessage) entry.getMessage(); |
| MessageReference ref = message.newReference(); |
| try |
| { |
| entry.delete(); |
| txn.dequeue(entry.getEnqueueRecord(), |
| new ServerTransaction.Action() |
| { |
| @Override |
| public void postCommit() |
| { |
| final ProtocolOutputConverter outputConverter = |
| _connection.getProtocolOutputConverter(); |
| |
| outputConverter.writeReturn(message.getMessagePublishInfo(), |
| message.getContentHeaderBody(), |
| message, |
| _channelId, |
| AMQConstant.NO_CONSUMERS.getCode(), |
| IMMEDIATE_DELIVERY_REPLY_TEXT); |
| |
| } |
| |
| @Override |
| public void onRollback() |
| { |
| |
| } |
| } |
| ); |
| txn.commit(); |
| } |
| finally |
| { |
| ref.release(); |
| } |
| |
| |
| } |
| else |
| { |
| if(queue instanceof CapacityChecker) |
| { |
| ((CapacityChecker)queue).checkCapacity(AMQChannel.this); |
| } |
| } |
| |
| } |
| } |
| |
| private final class CapacityCheckAction implements Action<MessageInstance> |
| { |
| @Override |
| public void performAction(final MessageInstance entry) |
| { |
| TransactionLogResource queue = entry.getOwningResource(); |
| if(queue instanceof CapacityChecker) |
| { |
| ((CapacityChecker)queue).checkCapacity(AMQChannel.this); |
| } |
| } |
| } |
| |
| private class MessageAcknowledgeAction implements ServerTransaction.Action |
| { |
| private Collection<MessageInstance> _ackedMessages; |
| |
| public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages) |
| { |
| _ackedMessages = ackedMessages; |
| } |
| |
| public void postCommit() |
| { |
| try |
| { |
| for(MessageInstance entry : _ackedMessages) |
| { |
| entry.delete(); |
| } |
| } |
| finally |
| { |
| _ackedMessages = Collections.emptySet(); |
| } |
| |
| } |
| |
| public void onRollback() |
| { |
| // explicit rollbacks resend the message after the rollback-ok is sent |
| if(_rollingBack) |
| { |
| for(MessageInstance entry : _ackedMessages) |
| { |
| entry.makeAcquisitionStealable(); |
| } |
| _resendList.addAll(_ackedMessages); |
| } |
| else |
| { |
| try |
| { |
| for(MessageInstance entry : _ackedMessages) |
| { |
| entry.release(entry.getAcquiringConsumer()); |
| } |
| } |
| finally |
| { |
| _ackedMessages = Collections.emptySet(); |
| } |
| } |
| |
| } |
| } |
| |
| private class WriteReturnAction implements ServerTransaction.Action |
| { |
| private final AMQConstant _errorCode; |
| private final String _description; |
| private final MessageReference<AMQMessage> _reference; |
| |
| public WriteReturnAction(AMQConstant errorCode, |
| String description, |
| AMQMessage message) |
| { |
| _errorCode = errorCode; |
| _description = description; |
| _reference = message.newReference(); |
| } |
| |
| public void postCommit() |
| { |
| AMQMessage message = _reference.getMessage(); |
| _connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), |
| message.getContentHeaderBody(), |
| message, |
| _channelId, |
| _errorCode.getCode(), |
| AMQShortString.validValueOf(_description)); |
| _reference.release(); |
| } |
| |
| public void onRollback() |
| { |
| _reference.release(); |
| } |
| } |
| |
| public synchronized void block() |
| { |
| if(_blockingEntities.add(this)) |
| { |
| |
| if(_blocking.compareAndSet(false,true)) |
| { |
| messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues **")); |
| |
| |
| getConnection().notifyWork(); |
| } |
| } |
| } |
| |
| public synchronized void unblock() |
| { |
| if(_blockingEntities.remove(this)) |
| { |
| if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false)) |
| { |
| messageWithSubject(ChannelMessages.FLOW_REMOVED()); |
| getConnection().notifyWork(); |
| } |
| } |
| } |
| |
| |
| public synchronized void block(Queue<?> queue) |
| { |
| if(_blockingEntities.add(queue)) |
| { |
| |
| if(_blocking.compareAndSet(false,true)) |
| { |
| messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName())); |
| getConnection().notifyWork(); |
| |
| } |
| } |
| } |
| |
| public synchronized void unblock(Queue<?> queue) |
| { |
| if(_blockingEntities.remove(queue)) |
| { |
| if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing()) |
| { |
| messageWithSubject(ChannelMessages.FLOW_REMOVED()); |
| getConnection().notifyWork(); |
| } |
| } |
| } |
| |
| @Override |
| public void transportStateChanged() |
| { |
| _creditManager.restoreCredit(0, 0); |
| _noAckCreditManager.restoreCredit(0, 0); |
| } |
| |
| @Override |
| public Object getConnectionReference() |
| { |
| return getConnection().getReference(); |
| } |
| |
| public int getUnacknowledgedMessageCount() |
| { |
| return getUnacknowledgedMessageMap().size(); |
| } |
| |
| private void flow(boolean flow) |
| { |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); |
| _connection.writeFrame(responseBody.generateFrame(_channelId)); |
| } |
| |
| @Override |
| public boolean getBlocking() |
| { |
| return _blocking.get(); |
| } |
| |
| public NamedAddressSpace getAddressSpace() |
| { |
| return getConnection().getAddressSpace(); |
| } |
| |
| private void deadLetter(long deliveryTag) |
| { |
| final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); |
| final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag); |
| |
| if (rejectedQueueEntry == null) |
| { |
| _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag); |
| } |
| else |
| { |
| final ServerMessage msg = rejectedQueueEntry.getMessage(); |
| int requeues = 0; |
| if (rejectedQueueEntry.makeAcquisitionUnstealable(rejectedQueueEntry.getAcquiringConsumer())) |
| { |
| requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>() |
| { |
| @Override |
| public void performAction(final MessageInstance requeueEntry) |
| { |
| messageWithSubject(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), |
| requeueEntry.getOwningResource() |
| .getName())); |
| } |
| }, null); |
| } |
| |
| if(requeues == 0) |
| { |
| |
| final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource(); |
| if(owningResource instanceof Queue) |
| { |
| final Queue<?> queue = (Queue<?>) owningResource; |
| |
| final Exchange altExchange = queue.getAlternateExchange(); |
| |
| if (altExchange == null) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug( |
| "No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " |
| + deliveryTag); |
| } |
| messageWithSubject(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), |
| queue.getName(), |
| msg.getInitialRoutingAddress())); |
| |
| } |
| else |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug( |
| "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " |
| + deliveryTag); |
| } |
| messageWithSubject(ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), |
| altExchange.getName())); |
| } |
| } |
| } |
| |
| } |
| } |
| |
| public void recordFuture(final ListenableFuture<Void> future, final ServerTransaction.Action action) |
| { |
| _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); |
| } |
| |
| public void sync() |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("sync() called on channel " + debugIdentity()); |
| } |
| |
| AsyncCommand cmd; |
| while((cmd = _unfinishedCommandsQueue.poll()) != null) |
| { |
| cmd.complete(); |
| } |
| if(_transaction instanceof LocalTransaction) |
| { |
| ((LocalTransaction)_transaction).sync(); |
| } |
| } |
| |
| private static class AsyncCommand |
| { |
| private final ListenableFuture<Void> _future; |
| private ServerTransaction.Action _action; |
| |
| public AsyncCommand(final ListenableFuture<Void> future, final ServerTransaction.Action action) |
| { |
| _future = future; |
| _action = action; |
| } |
| |
| void complete() |
| { |
| boolean interrupted = false; |
| try |
| { |
| while (true) |
| { |
| try |
| { |
| _future.get(); |
| break; |
| } |
| catch (InterruptedException e) |
| { |
| interrupted = true; |
| } |
| |
| } |
| } |
| catch(ExecutionException e) |
| { |
| if(e.getCause() instanceof RuntimeException) |
| { |
| throw (RuntimeException)e.getCause(); |
| } |
| else if(e.getCause() instanceof Error) |
| { |
| throw (Error) e.getCause(); |
| } |
| else |
| { |
| throw new ServerScopedRuntimeException(e.getCause()); |
| } |
| } |
| if(interrupted) |
| { |
| Thread.currentThread().interrupt(); |
| } |
| _action.postCommit(); |
| _action = null; |
| } |
| } |
| |
| @Override |
| public int getConsumerCount() |
| { |
| return _tag2SubscriptionTargetMap.size(); |
| } |
| |
| @Override |
| public Collection<Consumer<?>> getConsumers() |
| { |
| return Collections.unmodifiableCollection(_consumers); |
| } |
| |
| private class ConsumerClosedListener implements ConfigurationChangeListener |
| { |
| @Override |
| public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) |
| { |
| if(newState == State.DELETED) |
| { |
| consumerRemoved((Consumer<?>)object); |
| } |
| } |
| |
| @Override |
| public void childAdded(final ConfiguredObject object, final ConfiguredObject child) |
| { |
| |
| } |
| |
| @Override |
| public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) |
| { |
| |
| } |
| |
| @Override |
| public void attributeSet(final ConfiguredObject object, |
| final String attributeName, |
| final Object oldAttributeValue, |
| final Object newAttributeValue) |
| { |
| |
| } |
| |
| @Override |
| public void bulkChangeStart(final ConfiguredObject<?> object) |
| { |
| |
| } |
| |
| @Override |
| public void bulkChangeEnd(final ConfiguredObject<?> object) |
| { |
| |
| } |
| } |
| |
| private void consumerAdded(final Consumer<?> consumer) |
| { |
| for(ConsumerListener l : _consumerListeners) |
| { |
| l.consumerAdded(consumer); |
| } |
| } |
| |
| private void consumerRemoved(final Consumer<?> consumer) |
| { |
| for(ConsumerListener l : _consumerListeners) |
| { |
| l.consumerRemoved(consumer); |
| } |
| } |
| |
| @Override |
| public void addConsumerListener(ConsumerListener listener) |
| { |
| _consumerListeners.add(listener); |
| } |
| |
| @Override |
| public void removeConsumerListener(ConsumerListener listener) |
| { |
| _consumerListeners.remove(listener); |
| } |
| |
| @Override |
| public void setModelObject(final Session<?> session) |
| { |
| _modelObject = session; |
| } |
| |
| @Override |
| public Session<?> getModelObject() |
| { |
| return _modelObject; |
| } |
| |
| @Override |
| public long getTransactionStartTime() |
| { |
| ServerTransaction serverTransaction = _transaction; |
| if (serverTransaction.isTransactional()) |
| { |
| return serverTransaction.getTransactionStartTime(); |
| } |
| else |
| { |
| return 0L; |
| } |
| } |
| |
| @Override |
| public long getTransactionUpdateTime() |
| { |
| ServerTransaction serverTransaction = _transaction; |
| if (serverTransaction.isTransactional()) |
| { |
| return serverTransaction.getTransactionUpdateTime(); |
| } |
| else |
| { |
| return 0L; |
| } |
| } |
| |
| @Override |
| public void receiveAccessRequest(final AMQShortString realm, |
| final boolean exclusive, |
| final boolean passive, |
| final boolean active, final boolean write, final boolean read) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] AccessRequest[" +" realm: " + realm + |
| " exclusive: " + exclusive + |
| " passive: " + passive + |
| " active: " + active + |
| " write: " + write + " read: " + read + " ]"); |
| } |
| |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| |
| if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion())) |
| { |
| _connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, |
| "AccessRequest not present in AMQP versions other than 0-8, 0-9", |
| _channelId); |
| } |
| else |
| { |
| // We don't implement access control class, but to keep clients happy that expect it |
| // always use the "0" ticket. |
| AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0); |
| sync(); |
| _connection.writeFrame(response.generateFrame(_channelId)); |
| } |
| } |
| |
| @Override |
| public void receiveBasicAck(final long deliveryTag, final boolean multiple) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] BasicAck[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " ]"); |
| } |
| |
| acknowledgeMessage(deliveryTag, multiple); |
| } |
| |
| @Override |
| public void receiveBasicCancel(final AMQShortString consumerTag, final boolean nowait) |
| { |
| |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] BasicCancel[" +" consumerTag: " + consumerTag + " noWait: " + nowait + " ]"); |
| } |
| |
| unsubscribeConsumer(consumerTag); |
| if (!nowait) |
| { |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag); |
| sync(); |
| _connection.writeFrame(cancelOkBody.generateFrame(_channelId)); |
| } |
| } |
| |
| @Override |
| public void receiveBasicConsume(final AMQShortString queue, |
| final AMQShortString consumerTag, |
| final boolean noLocal, |
| final boolean noAck, |
| final boolean exclusive, final boolean nowait, final FieldTable arguments) |
| { |
| |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] BasicConsume[" +" queue: " + queue + |
| " consumerTag: " + consumerTag + |
| " noLocal: " + noLocal + |
| " noAck: " + noAck + |
| " exclusive: " + exclusive + " nowait: " + nowait + " arguments: " + arguments + " ]"); |
| } |
| |
| AMQShortString consumerTag1 = consumerTag; |
| NamedAddressSpace vHost = _connection.getAddressSpace(); |
| sync(); |
| String queueName = AMQShortString.toString(queue); |
| |
| MessageSource queue1 = queueName == null ? getDefaultQueue() : vHost.getAttainedMessageSource(queueName); |
| final Collection<MessageSource> sources = new HashSet<>(); |
| |
| if (arguments != null && arguments.get("x-multiqueue") instanceof Collection) |
| { |
| for (Object object : (Collection<Object>) arguments.get("x-multiqueue")) |
| { |
| String sourceName = String.valueOf(object); |
| sourceName = sourceName.trim(); |
| if (sourceName.length() != 0) |
| { |
| MessageSource source = vHost.getAttainedMessageSource(sourceName); |
| if (source == null) |
| { |
| sources.clear(); |
| break; |
| } |
| else |
| { |
| sources.add(source); |
| } |
| } |
| } |
| queueName = arguments.get("x-multiqueue").toString(); |
| } |
| else if (queue1 != null) |
| { |
| sources.add(queue1); |
| } |
| |
| |
| if (sources.isEmpty()) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("No queue for '" + queueName + "'"); |
| } |
| if (queueName != null) |
| { |
| closeChannel(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'"); |
| } |
| else |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, |
| "No queue name provided, no default queue defined.", _channelId); |
| } |
| } |
| else |
| { |
| try |
| { |
| consumerTag1 = consumeFromSource(consumerTag1, |
| sources, |
| !noAck, |
| arguments, |
| exclusive, |
| noLocal); |
| if (!nowait) |
| { |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag1); |
| _connection.writeFrame(responseBody.generateFrame(_channelId)); |
| |
| } |
| } |
| catch (ConsumerTagInUseException cte) |
| { |
| |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, |
| "Non-unique consumer tag, '" + consumerTag1 |
| + "'", _channelId); |
| } |
| catch (AMQInvalidArgumentException ise) |
| { |
| _connection.sendConnectionClose(AMQConstant.ARGUMENT_INVALID, ise.getMessage(), _channelId); |
| |
| |
| } |
| catch (Queue.ExistingExclusiveConsumer e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, |
| "Cannot subscribe to queue '" |
| + queue1.getName() |
| + "' as it already has an existing exclusive consumer", _channelId); |
| |
| } |
| catch (Queue.ExistingConsumerPreventsExclusive e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, |
| "Cannot subscribe to queue '" |
| + queue1.getName() |
| + "' exclusively as it already has a consumer", _channelId); |
| |
| } |
| catch (AccessControlException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue '" |
| + queue1.getName() |
| + "' permission denied", _channelId); |
| |
| } |
| catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, |
| "Cannot subscribe to queue '" |
| + queue1.getName() |
| + "' as it already has an incompatible exclusivity policy", _channelId); |
| |
| } |
| |
| } |
| } |
| |
| @Override |
| public void receiveBasicGet(final AMQShortString queueName, final boolean noAck) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] BasicGet[" +" queue: " + queueName + " noAck: " + noAck + " ]"); |
| } |
| |
| NamedAddressSpace vHost = _connection.getAddressSpace(); |
| sync(); |
| MessageSource queue = queueName == null ? getDefaultQueue() : vHost.getAttainedMessageSource(queueName.toString()); |
| if (queue == null) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("No queue for '" + queueName + "'"); |
| } |
| if (queueName != null) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'", _channelId); |
| |
| } |
| else |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, |
| "No queue name provided, no default queue defined.", _channelId); |
| |
| } |
| } |
| else |
| { |
| |
| try |
| { |
| if (!performGet(queue, !noAck)) |
| { |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| |
| BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); |
| |
| _connection.writeFrame(responseBody.generateFrame(_channelId)); |
| } |
| } |
| catch (AccessControlException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), _channelId); |
| } |
| catch (MessageSource.ExistingExclusiveConsumer e) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue has an exclusive consumer", _channelId); |
| } |
| catch (MessageSource.ExistingConsumerPreventsExclusive e) |
| { |
| _connection.sendConnectionClose(AMQConstant.INTERNAL_ERROR, |
| "The GET request has been evaluated as an exclusive consumer, " + |
| "this is likely due to a programming error in the Qpid broker", _channelId); |
| } |
| catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, |
| "Queue has an incompatible exclusivity policy", _channelId); |
| } |
| } |
| } |
| |
| @Override |
| public void receiveBasicPublish(final AMQShortString exchangeName, |
| final AMQShortString routingKey, |
| final boolean mandatory, |
| final boolean immediate) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] BasicPublish[" +" exchange: " + exchangeName + |
| " routingKey: " + routingKey + |
| " mandatory: " + mandatory + |
| " immediate: " + immediate + " ]"); |
| } |
| |
| |
| |
| NamedAddressSpace vHost = _connection.getAddressSpace(); |
| |
| if(blockingTimeoutExceeded()) |
| { |
| message(ChannelMessages.FLOW_CONTROL_IGNORED()); |
| closeChannel(AMQConstant.MESSAGE_TOO_LARGE, |
| "Channel flow control was requested, but not enforced by sender"); |
| } |
| else |
| { |
| MessageDestination destination; |
| |
| if (isDefaultExchange(exchangeName)) |
| { |
| destination = vHost.getDefaultDestination(); |
| } |
| else |
| { |
| destination = vHost.getAttainedMessageDestination(exchangeName.toString()); |
| } |
| |
| // if the exchange does not exist we raise a channel exception |
| if (destination == null) |
| { |
| closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: '" + exchangeName + "'"); |
| } |
| else |
| { |
| |
| MessagePublishInfo info = new MessagePublishInfo(exchangeName, |
| immediate, |
| mandatory, |
| routingKey); |
| |
| try |
| { |
| setPublishFrame(info, destination); |
| } |
| catch (AccessControlException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); |
| |
| } |
| } |
| } |
| } |
| |
| @Override |
| public EventLogger getEventLogger() |
| { |
| return getConnection().getEventLogger(); |
| } |
| |
| private boolean blockingTimeoutExceeded() |
| { |
| |
| return _wireBlockingState && (System.currentTimeMillis() - _blockTime) > _blockingTimeout; |
| } |
| |
| @Override |
| public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] BasicQos[" +" prefetchSize: " + prefetchSize + " prefetchCount: " + prefetchCount + " global: " + global + " ]"); |
| } |
| |
| sync(); |
| setCredit(prefetchSize, prefetchCount); |
| |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody(); |
| _connection.writeFrame(responseBody.generateFrame(getChannelId())); |
| |
| } |
| |
| @Override |
| public void receiveBasicRecover(final boolean requeue, final boolean sync) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] BasicRecover[" + " requeue: " + requeue + " sync: " + sync + " ]"); |
| } |
| |
| resend(); |
| |
| if (sync) |
| { |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); |
| sync(); |
| _connection.writeFrame(recoverOk.generateFrame(getChannelId())); |
| |
| } |
| |
| } |
| |
| @Override |
| public void receiveBasicReject(final long deliveryTag, final boolean requeue) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] BasicReject[" +" deliveryTag: " + deliveryTag + " requeue: " + requeue + " ]"); |
| } |
| |
| MessageInstance message = getUnacknowledgedMessageMap().get(deliveryTag); |
| |
| if (message == null) |
| { |
| _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); |
| } |
| else |
| { |
| |
| if (message.getMessage() == null) |
| { |
| _logger.warn("Message has already been purged, unable to Reject."); |
| } |
| else |
| { |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Rejecting: DT:" + deliveryTag |
| + "-" + message.getMessage() + |
| ": Requeue:" + requeue |
| + |
| " on channel:" + debugIdentity()); |
| } |
| |
| if (requeue) |
| { |
| message.decrementDeliveryCount(); |
| |
| requeue(deliveryTag); |
| } |
| else |
| { |
| // Since the JMS client abuses the reject flag for requeing after rollback, we won't set reject here |
| // as it would prevent redelivery |
| // message.reject(); |
| |
| final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag); |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("maxDeliveryCountEnabled: " |
| + maxDeliveryCountEnabled |
| + " deliveryTag " |
| + deliveryTag); |
| } |
| if (maxDeliveryCountEnabled) |
| { |
| final boolean deliveredTooManyTimes = isDeliveredTooManyTimes(deliveryTag); |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("deliveredTooManyTimes: " |
| + deliveredTooManyTimes |
| + " deliveryTag " |
| + deliveryTag); |
| } |
| if (deliveredTooManyTimes) |
| { |
| deadLetter(deliveryTag); |
| } |
| else |
| { |
| //this requeue represents a message rejected because of a recover/rollback that we |
| //are not ready to DLQ. We rely on the reject command to resend from the unacked map |
| //and therefore need to increment the delivery counter so we cancel out the effect |
| //of the AMQChannel#resend() decrement. |
| message.incrementDeliveryCount(); |
| } |
| } |
| else |
| { |
| requeue(deliveryTag); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void receiveChannelClose(final int replyCode, |
| final AMQShortString replyText, |
| final int classId, |
| final int methodId) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] ChannelClose[" +" replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]"); |
| } |
| |
| |
| sync(); |
| _connection.closeChannel(this); |
| |
| _connection.writeFrame(new AMQFrame(getChannelId(), |
| _connection.getMethodRegistry().createChannelCloseOkBody())); |
| } |
| |
| @Override |
| public void receiveChannelCloseOk() |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] ChannelCloseOk"); |
| } |
| |
| _connection.closeChannelOk(getChannelId()); |
| } |
| |
| @Override |
| public void receiveMessageContent(final QpidByteBuffer data) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] MessageContent[" +" data: " + hex(data,_connection.getBinaryDataLimit()) + " ] "); |
| } |
| |
| if(hasCurrentMessage()) |
| { |
| publishContentBody(new ContentBody(data)); |
| } |
| else |
| { |
| _connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, |
| "Attempt to send a content header without first sending a publish frame", |
| _channelId); |
| } |
| } |
| |
| @Override |
| public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] MessageHeader[ properties: {" + properties + "} bodySize: " + bodySize + " ]"); |
| } |
| |
| if(hasCurrentMessage()) |
| { |
| if(bodySize > _connection.getMaxMessageSize()) |
| { |
| closeChannel(AMQConstant.MESSAGE_TOO_LARGE, |
| "Message size of " + bodySize + " greater than allowed maximum of " + _connection.getMaxMessageSize()); |
| } |
| publishContentHeader(new ContentHeaderBody(properties, bodySize)); |
| } |
| else |
| { |
| _connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, |
| "Attempt to send a content header without first sending a publish frame", |
| _channelId); |
| } |
| } |
| |
| @Override |
| public boolean ignoreAllButCloseOk() |
| { |
| return _connection.ignoreAllButCloseOk() || _connection.channelAwaitingClosure(_channelId); |
| } |
| |
| @Override |
| public void receiveBasicNack(final long deliveryTag, final boolean multiple, final boolean requeue) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] BasicNack[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + requeue + " ]"); |
| } |
| |
| Map<Long, MessageInstance> nackedMessageMap = new LinkedHashMap<>(); |
| _unacknowledgedMessageMap.collect(deliveryTag, multiple, nackedMessageMap); |
| |
| for(MessageInstance message : nackedMessageMap.values()) |
| { |
| |
| if (message == null) |
| { |
| _logger.warn("Ignoring nack request as message is null for tag:" + deliveryTag); |
| } |
| else |
| { |
| |
| if (message.getMessage() == null) |
| { |
| _logger.warn("Message has already been purged, unable to nack."); |
| } |
| else |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Nack-ing: DT:" + deliveryTag |
| + "-" + message.getMessage() + |
| ": Requeue:" + requeue |
| + |
| " on channel:" + debugIdentity()); |
| } |
| |
| if (requeue) |
| { |
| message.decrementDeliveryCount(); |
| |
| requeue(deliveryTag); |
| } |
| else |
| { |
| message.reject(); |
| |
| final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag); |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("maxDeliveryCountEnabled: " |
| + maxDeliveryCountEnabled |
| + " deliveryTag " |
| + deliveryTag); |
| } |
| if (maxDeliveryCountEnabled) |
| { |
| final boolean deliveredTooManyTimes = isDeliveredTooManyTimes(deliveryTag); |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("deliveredTooManyTimes: " |
| + deliveredTooManyTimes |
| + " deliveryTag " |
| + deliveryTag); |
| } |
| if (deliveredTooManyTimes) |
| { |
| deadLetter(deliveryTag); |
| } |
| else |
| { |
| message.incrementDeliveryCount(); |
| } |
| } |
| else |
| { |
| requeue(deliveryTag); |
| } |
| } |
| } |
| } |
| |
| } |
| |
| } |
| |
| @Override |
| public void receiveChannelFlow(final boolean active) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] ChannelFlow[" +" active: " + active + " ]"); |
| } |
| |
| |
| sync(); |
| setSuspended(!active); |
| |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(active); |
| _connection.writeFrame(responseBody.generateFrame(getChannelId())); |
| |
| } |
| |
| @Override |
| public void receiveChannelFlowOk(final boolean active) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] ChannelFlowOk[" +" active: " + active + " ]"); |
| } |
| |
| // TODO - should we do anything here? |
| } |
| |
| @Override |
| public void receiveExchangeBound(final AMQShortString exchangeName, |
| final AMQShortString routingKey, |
| final AMQShortString queueName) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] ExchangeBound[" +" exchange: " + exchangeName + " routingKey: " + |
| routingKey + " queue: " + queueName + " ]"); |
| } |
| |
| NamedAddressSpace virtualHost = _connection.getAddressSpace(); |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| |
| sync(); |
| |
| int replyCode; |
| String replyText; |
| |
| if (isDefaultExchange(exchangeName)) |
| { |
| if (routingKey == null) |
| { |
| if (queueName == null) |
| { |
| replyCode = virtualHost.hasMessageSources() |
| ? ExchangeBoundOkBody.OK |
| : ExchangeBoundOkBody.NO_BINDINGS; |
| replyText = null; |
| |
| } |
| else |
| { |
| MessageSource queue = virtualHost.getAttainedMessageSource(queueName.toString()); |
| if (queue == null) |
| { |
| replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; |
| replyText = "Queue '" + queueName + "' not found"; |
| } |
| else |
| { |
| replyCode = ExchangeBoundOkBody.OK; |
| replyText = null; |
| } |
| } |
| } |
| else |
| { |
| if (queueName == null) |
| { |
| replyCode = virtualHost.getAttainedMessageDestination(routingKey.toString()) instanceof Queue |
| ? ExchangeBoundOkBody.OK |
| : ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK; |
| replyText = null; |
| } |
| else |
| { |
| MessageDestination destination = virtualHost.getAttainedMessageDestination(queueName.toString()); |
| Queue<?> queue = destination instanceof Queue ? (Queue) destination : null; |
| if (queue == null) |
| { |
| |
| replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; |
| replyText = "Queue '" + queueName + "' not found"; |
| } |
| else |
| { |
| replyCode = queueName.equals(routingKey) |
| ? ExchangeBoundOkBody.OK |
| : ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK; |
| replyText = null; |
| } |
| } |
| } |
| } |
| else |
| { |
| Exchange<?> exchange = getExchange(exchangeName.toString()); |
| if (exchange == null) |
| { |
| |
| replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND; |
| replyText = "Exchange '" + exchangeName + "' not found"; |
| } |
| else if (routingKey == null) |
| { |
| if (queueName == null) |
| { |
| if (exchange.hasBindings()) |
| { |
| replyCode = ExchangeBoundOkBody.OK; |
| replyText = null; |
| } |
| else |
| { |
| replyCode = ExchangeBoundOkBody.NO_BINDINGS; |
| replyText = null; |
| } |
| } |
| else |
| { |
| Queue<?> queue = getQueue(queueName.toString()); |
| if (queue == null) |
| { |
| replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; |
| replyText = "Queue '" + queueName + "' not found"; |
| } |
| else |
| { |
| if (exchange.isBound(queue)) |
| { |
| replyCode = ExchangeBoundOkBody.OK; |
| replyText = null; |
| } |
| else |
| { |
| replyCode = ExchangeBoundOkBody.QUEUE_NOT_BOUND; |
| replyText = "Queue '" |
| + queueName |
| + "' not bound to exchange '" |
| + exchangeName |
| + "'"; |
| } |
| } |
| } |
| } |
| else if (queueName != null) |
| { |
| Queue<?> queue = getQueue(queueName.toString()); |
| if (queue == null) |
| { |
| replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND; |
| replyText = "Queue '" + queueName + "' not found"; |
| } |
| else |
| { |
| String bindingKey = routingKey == null ? null : routingKey.toString(); |
| if (exchange.isBound(bindingKey, queue)) |
| { |
| |
| replyCode = ExchangeBoundOkBody.OK; |
| replyText = null; |
| } |
| else |
| { |
| replyCode = ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK; |
| replyText = "Queue '" + queueName + "' not bound with routing key '" + |
| routingKey + "' to exchange '" + exchangeName + "'"; |
| |
| } |
| } |
| } |
| else |
| { |
| if (exchange.isBound(routingKey == null ? "" : routingKey.toString())) |
| { |
| |
| replyCode = ExchangeBoundOkBody.OK; |
| replyText = null; |
| } |
| else |
| { |
| replyCode = ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK; |
| replyText = |
| "No queue bound with routing key '" + routingKey + "' to exchange '" + exchangeName + "'"; |
| } |
| } |
| } |
| |
| ExchangeBoundOkBody exchangeBoundOkBody = |
| methodRegistry.createExchangeBoundOkBody(replyCode, AMQShortString.validValueOf(replyText)); |
| |
| _connection.writeFrame(exchangeBoundOkBody.generateFrame(getChannelId())); |
| |
| } |
| |
| @Override |
| public void receiveExchangeDeclare(final AMQShortString exchangeName, |
| final AMQShortString type, |
| final boolean passive, |
| final boolean durable, |
| final boolean autoDelete, |
| final boolean internal, |
| final boolean nowait, |
| final FieldTable arguments) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] ExchangeDeclare[" +" exchange: " + exchangeName + |
| " type: " + type + |
| " passive: " + passive + |
| " durable: " + durable + |
| " autoDelete: " + autoDelete + |
| " internal: " + internal + " nowait: " + nowait + " arguments: " + arguments + " ]"); |
| } |
| |
| final MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| final AMQMethodBody declareOkBody = methodRegistry.createExchangeDeclareOkBody(); |
| |
| Exchange<?> exchange; |
| NamedAddressSpace virtualHost = _connection.getAddressSpace(); |
| |
| if (isDefaultExchange(exchangeName)) |
| { |
| if (!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type)) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: " |
| + " of type " |
| + ExchangeDefaults.DIRECT_EXCHANGE_CLASS |
| + " to " + type + ".", getChannelId()); |
| } |
| else if (!nowait) |
| { |
| sync(); |
| _connection.writeFrame(declareOkBody.generateFrame(getChannelId())); |
| } |
| |
| } |
| else |
| { |
| if (passive) |
| { |
| exchange = getExchange(exchangeName.toString()); |
| if (exchange == null) |
| { |
| closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: '" + exchangeName + "'"); |
| } |
| else if (!(type == null || type.length() == 0) && !exchange.getType().equals(type.toString())) |
| { |
| |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: '" |
| + exchangeName |
| + "' of type " |
| + exchange.getType() |
| + " to " |
| + type |
| + ".", getChannelId()); |
| } |
| else if (!nowait) |
| { |
| sync(); |
| _connection.writeFrame(declareOkBody.generateFrame(getChannelId())); |
| } |
| |
| } |
| else |
| { |
| String name = exchangeName.toString(); |
| String typeString = type == null ? null : type.toString(); |
| try |
| { |
| |
| Map<String, Object> attributes = new HashMap<String, Object>(); |
| if (arguments != null) |
| { |
| attributes.putAll(FieldTable.convertToMap(arguments)); |
| } |
| attributes.put(Exchange.NAME, name); |
| attributes.put(Exchange.TYPE, typeString); |
| attributes.put(Exchange.DURABLE, durable); |
| attributes.put(Exchange.LIFETIME_POLICY, |
| autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); |
| if (!attributes.containsKey(Exchange.ALTERNATE_EXCHANGE)) |
| { |
| attributes.put(Exchange.ALTERNATE_EXCHANGE, null); |
| } |
| exchange = virtualHost.createMessageDestination(Exchange.class, attributes); |
| |
| if (!nowait) |
| { |
| sync(); |
| _connection.writeFrame(declareOkBody.generateFrame(getChannelId())); |
| } |
| |
| } |
| catch (ReservedExchangeNameException e) |
| { |
| Exchange existing = getExchange(name); |
| if (existing == null || !existing.getType().equals(typeString)) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, |
| "Attempt to declare exchange: '" + exchangeName + |
| "' which begins with reserved prefix.", getChannelId()); |
| } |
| else if(!nowait) |
| { |
| sync(); |
| _connection.writeFrame(declareOkBody.generateFrame(getChannelId())); |
| } |
| } |
| catch (ExchangeExistsException e) |
| { |
| exchange = e.getExistingExchange(); |
| if (!exchange.getType().equals(typeString)) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: '" |
| + exchangeName + "' of type " |
| + exchange.getType() |
| + " to " + type + ".", getChannelId()); |
| } |
| else |
| { |
| if (!nowait) |
| { |
| sync(); |
| _connection.writeFrame(declareOkBody.generateFrame(getChannelId())); |
| } |
| } |
| } |
| catch (NoFactoryForTypeException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, "Unknown exchange type '" |
| + e.getType() |
| + "' for exchange '" |
| + exchangeName |
| + "'", getChannelId()); |
| |
| } |
| catch (AccessControlException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); |
| |
| } |
| catch (UnknownConfiguredObjectException e) |
| { |
| // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur |
| final String message = "Unknown alternate exchange " |
| + (e.getName() != null |
| ? "name: '" + e.getName() + "'" |
| : "id: " + e.getId()); |
| _connection.sendConnectionClose(AMQConstant.NOT_FOUND, message, getChannelId()); |
| |
| } |
| catch (IllegalArgumentException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, "Error creating exchange '" |
| + exchangeName |
| + "': " |
| + e.getMessage(), getChannelId()); |
| |
| } |
| } |
| } |
| |
| } |
| |
| @Override |
| public void receiveExchangeDelete(final AMQShortString exchangeStr, final boolean ifUnused, final boolean nowait) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] ExchangeDelete[" +" exchange: " + exchangeStr + " ifUnused: " + ifUnused + " nowait: " + nowait + " ]"); |
| } |
| |
| |
| NamedAddressSpace virtualHost = _connection.getAddressSpace(); |
| sync(); |
| |
| if (isDefaultExchange(exchangeStr)) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, |
| "Default Exchange cannot be deleted", getChannelId()); |
| |
| } |
| |
| else |
| { |
| final String exchangeName = exchangeStr.toString(); |
| |
| final Exchange<?> exchange = getExchange(exchangeName); |
| if (exchange == null) |
| { |
| closeChannel(AMQConstant.NOT_FOUND, "No such exchange: '" + exchangeStr + "'"); |
| } |
| else |
| { |
| if (ifUnused && exchange.hasBindings()) |
| { |
| closeChannel(AMQConstant.IN_USE, "Exchange has bindings"); |
| } |
| else |
| { |
| try |
| { |
| exchange.delete(); |
| |
| |
| if (!nowait) |
| { |
| ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody(); |
| _connection.writeFrame(responseBody.generateFrame(getChannelId())); |
| } |
| } |
| catch (ExchangeIsAlternateException e) |
| { |
| closeChannel(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange"); |
| } |
| catch (RequiredExchangeException e) |
| { |
| closeChannel(AMQConstant.NOT_ALLOWED, "Exchange '" + exchangeStr + "' cannot be deleted"); |
| } |
| catch (AccessControlException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void receiveQueueBind(final AMQShortString queueName, |
| final AMQShortString exchange, |
| AMQShortString routingKey, |
| final boolean nowait, |
| final FieldTable argumentsTable) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] QueueBind[" +" queue: " + queueName + |
| " exchange: " + exchange + |
| " bindingKey: " + routingKey + |
| " nowait: " + nowait + " arguments: " + argumentsTable + " ]"); |
| } |
| |
| NamedAddressSpace virtualHost = _connection.getAddressSpace(); |
| Queue<?> queue; |
| if (queueName == null) |
| { |
| |
| queue = getDefaultQueue(); |
| |
| if (queue != null) |
| { |
| if (routingKey == null) |
| { |
| routingKey = AMQShortString.valueOf(queue.getName()); |
| } |
| } |
| } |
| else |
| { |
| queue = getQueue(queueName.toString()); |
| routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey; |
| } |
| |
| if (queue == null) |
| { |
| String message = queueName == null |
| ? "No default queue defined on channel and queue was null" |
| : "Queue " + queueName + " does not exist."; |
| closeChannel(AMQConstant.NOT_FOUND, message); |
| } |
| else if (isDefaultExchange(exchange)) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, |
| "Cannot bind the queue '" + queueName + "' to the default exchange", getChannelId()); |
| |
| } |
| else |
| { |
| |
| final String exchangeName = exchange.toString(); |
| |
| final Exchange<?> exch = getExchange(exchangeName); |
| if (exch == null) |
| { |
| closeChannel(AMQConstant.NOT_FOUND, |
| "Exchange '" + exchangeName + "' does not exist."); |
| } |
| else |
| { |
| |
| try |
| { |
| |
| Map<String, Object> arguments = FieldTable.convertToMap(argumentsTable); |
| String bindingKey = String.valueOf(routingKey); |
| |
| if (!exch.isBound(bindingKey, arguments, queue)) |
| { |
| |
| if (!exch.addBinding(bindingKey, queue, arguments) |
| && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals( |
| exch.getType())) |
| { |
| exch.replaceBinding(bindingKey, queue, arguments); |
| } |
| } |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Binding queue " |
| + queue |
| + " to exchange " |
| + exch |
| + " with routing key " |
| + routingKey); |
| } |
| if (!nowait) |
| { |
| sync(); |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); |
| _connection.writeFrame(responseBody.generateFrame(getChannelId())); |
| |
| } |
| } |
| catch (AccessControlException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void receiveQueueDeclare(final AMQShortString queueStr, |
| final boolean passive, |
| final boolean durable, |
| final boolean exclusive, |
| final boolean autoDelete, |
| final boolean nowait, |
| final FieldTable arguments) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] QueueDeclare[" +" queue: " + queueStr + |
| " passive: " + passive + |
| " durable: " + durable + |
| " exclusive: " + exclusive + |
| " autoDelete: " + autoDelete + " nowait: " + nowait + " arguments: " + arguments + " ]"); |
| } |
| |
| NamedAddressSpace virtualHost = _connection.getAddressSpace(); |
| |
| final AMQShortString queueName; |
| |
| // if we aren't given a queue name, we create one which we return to the client |
| if ((queueStr == null) || (queueStr.length() == 0)) |
| { |
| queueName = new AMQShortString("tmp_" + UUID.randomUUID()); |
| } |
| else |
| { |
| queueName = queueStr; |
| } |
| |
| Queue<?> queue; |
| |
| //TODO: do we need to check that the queue already exists with exactly the same "configuration"? |
| |
| |
| if (passive) |
| { |
| queue = getQueue(queueName.toString()); |
| if (queue == null) |
| { |
| closeChannel(AMQConstant.NOT_FOUND, |
| "Queue: '" |
| + queueName |
| + "' not found on VirtualHost '" |
| + virtualHost.getName() |
| + "'."); |
| } |
| else |
| { |
| if (!queue.verifySessionAccess(this)) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue '" |
| + queue.getName() |
| + "' is exclusive, but not created on this Connection.", getChannelId()); |
| } |
| else |
| { |
| //set this as the default queue on the channel: |
| setDefaultQueue(queue); |
| if (!nowait) |
| { |
| sync(); |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| QueueDeclareOkBody responseBody = |
| methodRegistry.createQueueDeclareOkBody(queueName, |
| queue.getQueueDepthMessages(), |
| queue.getConsumerCount()); |
| _connection.writeFrame(responseBody.generateFrame(getChannelId())); |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Queue " + queueName + " declared successfully"); |
| } |
| } |
| } |
| } |
| } |
| else |
| { |
| |
| try |
| { |
| Map<String, Object> attributes = |
| QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(arguments)); |
| final String queueNameString = AMQShortString.toString(queueName); |
| attributes.put(Queue.NAME, queueNameString); |
| attributes.put(Queue.DURABLE, durable); |
| |
| LifetimePolicy lifetimePolicy; |
| ExclusivityPolicy exclusivityPolicy; |
| |
| if (exclusive) |
| { |
| lifetimePolicy = autoDelete |
| ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS |
| : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; |
| exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; |
| } |
| else |
| { |
| lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; |
| exclusivityPolicy = ExclusivityPolicy.NONE; |
| } |
| |
| if(!attributes.containsKey(Queue.EXCLUSIVE)) |
| { |
| attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); |
| } |
| if(!attributes.containsKey(Queue.LIFETIME_POLICY)) |
| { |
| attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); |
| } |
| |
| queue = virtualHost.createMessageSource(Queue.class, attributes); |
| |
| setDefaultQueue(queue); |
| |
| if (!nowait) |
| { |
| sync(); |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| QueueDeclareOkBody responseBody = |
| methodRegistry.createQueueDeclareOkBody(queueName, |
| queue.getQueueDepthMessages(), |
| queue.getConsumerCount()); |
| _connection.writeFrame(responseBody.generateFrame(getChannelId())); |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Queue " + queueName + " declared successfully"); |
| } |
| } |
| } |
| catch (QueueExistsException qe) |
| { |
| |
| queue = qe.getExistingQueue(); |
| |
| if (!queue.verifySessionAccess(this)) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue '" |
| + queue.getName() |
| + "' is exclusive, but not created on this Connection.", getChannelId()); |
| |
| } |
| else if (queue.isExclusive() != exclusive) |
| { |
| |
| closeChannel(AMQConstant.ALREADY_EXISTS, |
| "Cannot re-declare queue '" |
| + queue.getName() |
| + "' with different exclusivity (was: " |
| + queue.isExclusive() |
| + " requested " |
| + exclusive |
| + ")"); |
| } |
| else if ((autoDelete |
| && queue.getLifetimePolicy() == LifetimePolicy.PERMANENT) |
| || (!autoDelete && queue.getLifetimePolicy() != ((exclusive |
| && !durable) |
| ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE |
| : LifetimePolicy.PERMANENT))) |
| { |
| closeChannel(AMQConstant.ALREADY_EXISTS, |
| "Cannot re-declare queue '" |
| + queue.getName() |
| + "' with different lifetime policy (was: " |
| + queue.getLifetimePolicy() |
| + " requested autodelete: " |
| + autoDelete |
| + ")"); |
| } |
| else |
| { |
| setDefaultQueue(queue); |
| if (!nowait) |
| { |
| sync(); |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| QueueDeclareOkBody responseBody = |
| methodRegistry.createQueueDeclareOkBody(queueName, |
| queue.getQueueDepthMessages(), |
| queue.getConsumerCount()); |
| _connection.writeFrame(responseBody.generateFrame(getChannelId())); |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Queue " + queueName + " declared successfully"); |
| } |
| } |
| } |
| } |
| catch (AccessControlException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); |
| } |
| |
| } |
| } |
| |
| @Override |
| public void receiveQueueDelete(final AMQShortString queueName, |
| final boolean ifUnused, |
| final boolean ifEmpty, |
| final boolean nowait) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] QueueDelete[" +" queue: " + queueName + " ifUnused: " + ifUnused + " ifEmpty: " + ifEmpty + " nowait: " + nowait + " ]"); |
| } |
| |
| NamedAddressSpace virtualHost = _connection.getAddressSpace(); |
| sync(); |
| Queue<?> queue; |
| if (queueName == null) |
| { |
| |
| //get the default queue on the channel: |
| queue = getDefaultQueue(); |
| } |
| else |
| { |
| queue = getQueue(queueName.toString()); |
| } |
| |
| if (queue == null) |
| { |
| closeChannel(AMQConstant.NOT_FOUND, "Queue '" + queueName + "' does not exist."); |
| |
| } |
| else |
| { |
| if (ifEmpty && !queue.isEmpty()) |
| { |
| closeChannel(AMQConstant.IN_USE, "Queue: '" + queueName + "' is not empty."); |
| } |
| else if (ifUnused && !queue.isUnused()) |
| { |
| // TODO - Error code |
| closeChannel(AMQConstant.IN_USE, "Queue: '" + queueName + "' is still used."); |
| } |
| else |
| { |
| if (!queue.verifySessionAccess(this)) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue '" |
| + queue.getName() |
| + "' is exclusive, but not created on this Connection.", getChannelId()); |
| |
| } |
| else |
| { |
| try |
| { |
| int purged = queue.deleteAndReturnCount(); |
| |
| if (!nowait || _connection.isSendQueueDeleteOkRegardless()) |
| { |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged); |
| _connection.writeFrame(responseBody.generateFrame(getChannelId())); |
| } |
| } |
| catch (AccessControlException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); |
| |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void receiveQueuePurge(final AMQShortString queueName, final boolean nowait) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] QueuePurge[" +" queue: " + queueName + " nowait: " + nowait + " ]"); |
| } |
| |
| NamedAddressSpace virtualHost = _connection.getAddressSpace(); |
| Queue<?> queue = null; |
| if (queueName == null && (queue = getDefaultQueue()) == null) |
| { |
| |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "No queue specified.", getChannelId()); |
| } |
| else if ((queueName != null) && (queue = getQueue(queueName.toString())) == null) |
| { |
| closeChannel(AMQConstant.NOT_FOUND, "Queue '" + queueName + "' does not exist."); |
| } |
| else if (!queue.verifySessionAccess(this)) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, |
| "Queue is exclusive, but not created on this Connection.", getChannelId()); |
| } |
| else |
| { |
| try |
| { |
| long purged = queue.clearQueue(); |
| if (!nowait) |
| { |
| sync(); |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); |
| _connection.writeFrame(responseBody.generateFrame(getChannelId())); |
| |
| } |
| } |
| catch (AccessControlException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); |
| |
| } |
| |
| } |
| } |
| |
| @Override |
| public void receiveQueueUnbind(final AMQShortString queueName, |
| final AMQShortString exchange, |
| final AMQShortString bindingKey, |
| final FieldTable arguments) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] QueueUnbind[" +" queue: " + queueName + |
| " exchange: " + exchange + |
| " bindingKey: " + bindingKey + |
| " arguments: " + arguments + " ]"); |
| } |
| |
| NamedAddressSpace virtualHost = _connection.getAddressSpace(); |
| |
| |
| final boolean useDefaultQueue = queueName == null; |
| final Queue<?> queue = useDefaultQueue |
| ? getDefaultQueue() |
| : getQueue(queueName.toString()); |
| |
| |
| if (queue == null) |
| { |
| String message = useDefaultQueue |
| ? "No default queue defined on channel and queue was null" |
| : "Queue '" + queueName + "' does not exist."; |
| closeChannel(AMQConstant.NOT_FOUND, message); |
| } |
| else if (isDefaultExchange(exchange)) |
| { |
| _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue '" |
| + queue.getName() |
| + "' from the default exchange", getChannelId()); |
| |
| } |
| else |
| { |
| |
| final Exchange<?> exch = getExchange(exchange.toString()); |
| |
| if (exch == null) |
| { |
| closeChannel(AMQConstant.NOT_FOUND, "Exchange '" + exchange + "' does not exist."); |
| } |
| else if (!exch.hasBinding(String.valueOf(bindingKey), queue)) |
| { |
| closeChannel(AMQConstant.NOT_FOUND, "No such binding"); |
| } |
| else |
| { |
| try |
| { |
| exch.deleteBinding(String.valueOf(bindingKey), queue); |
| |
| final AMQMethodBody responseBody = _connection.getMethodRegistry().createQueueUnbindOkBody(); |
| sync(); |
| _connection.writeFrame(responseBody.generateFrame(getChannelId())); |
| } |
| catch (AccessControlException e) |
| { |
| _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId()); |
| |
| } |
| } |
| |
| } |
| } |
| |
| @Override |
| public void receiveTxSelect() |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] TxSelect"); |
| } |
| |
| setLocalTransactional(); |
| |
| MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody(); |
| _connection.writeFrame(responseBody.generateFrame(_channelId)); |
| |
| } |
| |
| @Override |
| public void receiveTxCommit() |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] TxCommit"); |
| } |
| |
| |
| if (!isTransactional()) |
| { |
| closeChannel(AMQConstant.COMMAND_INVALID, |
| "Fatal error: commit called on non-transactional channel"); |
| } |
| commit(new Runnable() |
| { |
| |
| @Override |
| public void run() |
| { |
| _connection.writeFrame(_txCommitOkFrame); |
| } |
| }, true); |
| |
| } |
| |
| @Override |
| public void receiveTxRollback() |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] TxRollback"); |
| } |
| |
| if (!isTransactional()) |
| { |
| closeChannel(AMQConstant.COMMAND_INVALID, |
| "Fatal error: rollback called on non-transactional channel"); |
| } |
| |
| final MethodRegistry methodRegistry = _connection.getMethodRegistry(); |
| final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody(); |
| |
| Runnable task = new Runnable() |
| { |
| |
| public void run() |
| { |
| _connection.writeFrame(responseBody.generateFrame(_channelId)); |
| } |
| }; |
| |
| rollback(task); |
| |
| //Now resend all the unacknowledged messages back to the original subscribers. |
| //(Must be done after the TxnRollback-ok response). |
| // Why, are we not allowed to send messages back to client before the ok method? |
| resend(); |
| } |
| |
| @Override |
| public void receiveConfirmSelect(final boolean nowait) |
| { |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("RECV[" + _channelId + "] ConfirmSelect [ nowait: " + nowait + " ]"); |
| } |
| _confirmOnPublish = true; |
| |
| if(!nowait) |
| { |
| _connection.writeFrame(new AMQFrame(_channelId, ConfirmSelectOkBody.INSTANCE)); |
| } |
| } |
| |
| |
| private void closeChannel(final AMQConstant cause, final String message) |
| { |
| _connection.closeChannelAndWriteFrame(this, cause, message); |
| } |
| |
| |
| private boolean isDefaultExchange(final AMQShortString exchangeName) |
| { |
| return exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName); |
| } |
| |
| private void setDefaultQueue(Queue<?> queue) |
| { |
| Queue<?> currentDefaultQueue = _defaultQueue; |
| if (queue != currentDefaultQueue) |
| { |
| if (currentDefaultQueue != null) |
| { |
| currentDefaultQueue.removeDeleteTask(_defaultQueueAssociationClearingTask); |
| } |
| if (queue != null) |
| { |
| queue.addDeleteTask(_defaultQueueAssociationClearingTask); |
| } |
| } |
| _defaultQueue = queue; |
| } |
| |
| private Queue<?> getDefaultQueue() |
| { |
| return _defaultQueue; |
| } |
| |
| private class DefaultQueueAssociationClearingTask implements Action<Queue<?>> |
| { |
| @Override |
| public void performAction(final Queue<?> queue) |
| { |
| if ( queue == _defaultQueue) |
| { |
| _defaultQueue = null; |
| } |
| } |
| } |
| |
| @Override |
| public boolean processPending() |
| { |
| if (!getAMQPConnection().isIOThread()) |
| { |
| return false; |
| } |
| |
| boolean desiredBlockingState = _blocking.get(); |
| if (desiredBlockingState != _wireBlockingState) |
| { |
| _wireBlockingState = desiredBlockingState; |
| flow(!desiredBlockingState); |
| _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0; |
| } |
| |
| boolean consumerListNeedsRefreshing; |
| if(_consumersWithPendingWork.isEmpty()) |
| { |
| _consumersWithPendingWork.addAll(getConsumerTargets()); |
| consumerListNeedsRefreshing = false; |
| } |
| else |
| { |
| consumerListNeedsRefreshing = true; |
| } |
| |
| // QPID-7447: prevent unnecessary allocation of empty iterator |
| Iterator<ConsumerTarget_0_8> iter = _consumersWithPendingWork.isEmpty() ? Collections.<ConsumerTarget_0_8>emptyIterator() : _consumersWithPendingWork.iterator(); |
| |
| boolean consumerHasMoreWork = false; |
| while(iter.hasNext()) |
| { |
| final ConsumerTarget_0_8 target = iter.next(); |
| iter.remove(); |
| if(target.hasPendingWork()) |
| { |
| consumerHasMoreWork = true; |
| target.processPending(); |
| break; |
| } |
| } |
| |
| return consumerHasMoreWork || consumerListNeedsRefreshing; |
| } |
| |
| @Override |
| public void addTicker(final Ticker ticker) |
| { |
| getConnection().getAggregateTicker().addTicker(ticker); |
| // trigger a wakeup to ensure the ticker will be taken into account |
| getAMQPConnection().notifyWork(); |
| } |
| |
| @Override |
| public void removeTicker(final Ticker ticker) |
| { |
| getConnection().getAggregateTicker().removeTicker(ticker); |
| } |
| |
| @Override |
| public void notifyConsumerTargetCurrentStates() |
| { |
| for(ConsumerTarget_0_8 consumerTarget : getConsumerTargets()) |
| { |
| if(!consumerTarget.isPullOnly()) |
| { |
| consumerTarget.notifyCurrentState(); |
| } |
| } |
| } |
| |
| @Override |
| public void ensureConsumersNoticedStateChange() |
| { |
| for (ConsumerTarget_0_8 consumerTarget : getConsumerTargets()) |
| { |
| try |
| { |
| consumerTarget.getSendLock(); |
| } |
| finally |
| { |
| consumerTarget.releaseSendLock(); |
| } |
| } |
| } |
| |
| private Collection<ConsumerTarget_0_8> getConsumerTargets() |
| { |
| return _tag2SubscriptionTargetMap.values(); |
| } |
| |
| private Exchange<?> getExchange(String name) |
| { |
| MessageDestination destination = getAddressSpace().getAttainedMessageDestination(name); |
| return destination instanceof Exchange ? (Exchange<?>) destination : null; |
| } |
| |
| private Queue<?> getQueue(String name) |
| { |
| MessageSource source = getAddressSpace().getAttainedMessageSource(name); |
| return source instanceof Queue ? (Queue<?>) source : null; |
| } |
| |
| public void dispose() |
| { |
| _txCommitOkFrame.dispose(); |
| } |
| } |