| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentSkipListSet; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.log4j.Logger; |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.AMQSecurityException; |
| import org.apache.qpid.framing.AMQMethodBody; |
| import org.apache.qpid.framing.AMQShortString; |
| import org.apache.qpid.framing.BasicContentHeaderProperties; |
| import org.apache.qpid.framing.ContentBody; |
| import org.apache.qpid.framing.ContentHeaderBody; |
| import org.apache.qpid.framing.FieldTable; |
| import org.apache.qpid.framing.MethodRegistry; |
| import org.apache.qpid.framing.abstraction.ContentChunk; |
| import org.apache.qpid.framing.abstraction.MessagePublishInfo; |
| import org.apache.qpid.protocol.AMQConstant; |
| import org.apache.qpid.server.ack.UnacknowledgedMessageMap; |
| import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; |
| import org.apache.qpid.server.configuration.ConfigStore; |
| import org.apache.qpid.server.configuration.ConfiguredObject; |
| import org.apache.qpid.server.configuration.ConnectionConfig; |
| import org.apache.qpid.server.configuration.SessionConfig; |
| import org.apache.qpid.server.configuration.SessionConfigType; |
| import org.apache.qpid.server.exchange.Exchange; |
| import org.apache.qpid.server.flow.FlowCreditManager; |
| import org.apache.qpid.server.flow.Pre0_10CreditManager; |
| import org.apache.qpid.server.logging.LogActor; |
| import org.apache.qpid.server.logging.LogSubject; |
| import org.apache.qpid.server.logging.actors.AMQPChannelActor; |
| import org.apache.qpid.server.logging.actors.CurrentActor; |
| import org.apache.qpid.server.logging.messages.ChannelMessages; |
| import org.apache.qpid.server.logging.messages.ExchangeMessages; |
| import org.apache.qpid.server.logging.subjects.ChannelLogSubject; |
| import org.apache.qpid.server.message.AMQMessage; |
| import org.apache.qpid.server.message.InboundMessage; |
| import org.apache.qpid.server.message.MessageMetaData; |
| import org.apache.qpid.server.message.MessageReference; |
| import org.apache.qpid.server.message.ServerMessage; |
| import org.apache.qpid.server.output.ProtocolOutputConverter; |
| import org.apache.qpid.server.protocol.AMQConnectionModel; |
| import org.apache.qpid.server.protocol.AMQProtocolEngine; |
| import org.apache.qpid.server.protocol.AMQProtocolSession; |
| import org.apache.qpid.server.protocol.AMQSessionModel; |
| import org.apache.qpid.server.queue.AMQQueue; |
| import org.apache.qpid.server.queue.BaseQueue; |
| import org.apache.qpid.server.queue.InboundMessageAdapter; |
| import org.apache.qpid.server.queue.IncomingMessage; |
| import org.apache.qpid.server.queue.QueueEntry; |
| import org.apache.qpid.server.registry.ApplicationRegistry; |
| import org.apache.qpid.server.store.MessageStore; |
| import org.apache.qpid.server.store.StoredMessage; |
| import org.apache.qpid.server.subscription.ClientDeliveryMethod; |
| import org.apache.qpid.server.subscription.RecordDeliveryMethod; |
| import org.apache.qpid.server.subscription.Subscription; |
| import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; |
| import org.apache.qpid.server.subscription.SubscriptionImpl; |
| import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; |
| import org.apache.qpid.server.txn.LocalTransaction; |
| import org.apache.qpid.server.txn.ServerTransaction; |
| import org.apache.qpid.server.virtualhost.VirtualHost; |
| import org.apache.qpid.transport.TransportException; |
| |
| public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder |
| { |
| public static final int DEFAULT_PREFETCH = 4096; |
| |
| private static final Logger _logger = Logger.getLogger(AMQChannel.class); |
| |
| private static final boolean MSG_AUTH = |
| ApplicationRegistry.getInstance().getConfiguration().getMsgAuth(); |
| |
| |
| private final int _channelId; |
| |
| |
| private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l); |
| |
| /** |
| * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that |
| * value of this represents the <b>last</b> tag sent out |
| */ |
| private long _deliveryTag = 0; |
| |
| /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ |
| private AMQQueue _defaultQueue; |
| |
| /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ |
| private int _consumerTag; |
| |
| /** |
| * The current message - which may be partial in the sense that not all frames have been received yet - which has |
| * been received by this channel. As the frames are received the message gets updated and once all frames have been |
| * received the message can then be routed. |
| */ |
| private IncomingMessage _currentMessage; |
| |
| /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ |
| private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); |
| |
| private final MessageStore _messageStore; |
| |
| private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>(); |
| |
| private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; |
| |
| private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); |
| |
| // Set of messages being acknoweledged in the current transaction |
| private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>(); |
| |
| private final AtomicBoolean _suspended = new AtomicBoolean(false); |
| |
| private ServerTransaction _transaction; |
| |
| private final AtomicLong _txnStarts = new AtomicLong(0); |
| private final AtomicLong _txnCommits = new AtomicLong(0); |
| private final AtomicLong _txnRejects = new AtomicLong(0); |
| private final AtomicLong _txnCount = new AtomicLong(0); |
| private final AtomicLong _txnUpdateTime = new AtomicLong(0); |
| |
| private final AMQProtocolSession _session; |
| private AtomicBoolean _closing = new AtomicBoolean(false); |
| |
| private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>(); |
| |
| private final AtomicBoolean _blocking = new AtomicBoolean(false); |
| |
| |
| private LogActor _actor; |
| private LogSubject _logSubject; |
| private volatile boolean _rollingBack; |
| |
| private static final Runnable NULL_TASK = new Runnable() { public void run() {} }; |
| private List<QueueEntry> _resendList = new ArrayList<QueueEntry>(); |
| private static final |
| AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); |
| private final UUID _id; |
| private long _createTime = System.currentTimeMillis(); |
| |
| private final ClientDeliveryMethod _clientDeliveryMethod; |
| |
| public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) |
| throws AMQException |
| { |
| _session = session; |
| _channelId = channelId; |
| |
| _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger()); |
| _logSubject = new ChannelLogSubject(this); |
| _id = getConfigStore().createId(); |
| _actor.message(ChannelMessages.CREATE()); |
| |
| getConfigStore().addConfiguredObject(this); |
| |
| _messageStore = messageStore; |
| |
| // by default the session is non-transactional |
| _transaction = new AsyncAutoCommitTransaction(_messageStore, this); |
| |
| _clientDeliveryMethod = session.createDeliveryMethod(_channelId); |
| } |
| |
| public ConfigStore getConfigStore() |
| { |
| return getVirtualHost().getConfigStore(); |
| } |
| |
| /** Sets this channel to be part of a local transaction */ |
| public void setLocalTransactional() |
| { |
| _transaction = new LocalTransaction(_messageStore); |
| _txnStarts.incrementAndGet(); |
| } |
| |
| public boolean isTransactional() |
| { |
| return _transaction.isTransactional(); |
| } |
| |
| public void receivedComplete() |
| { |
| sync(); |
| } |
| |
| |
| public boolean inTransaction() |
| { |
| return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; |
| } |
| |
| private void incrementOutstandingTxnsIfNecessary() |
| { |
| if(isTransactional()) |
| { |
| //There can currently only be at most one outstanding transaction |
| //due to only having LocalTransaction support. Set value to 1 if 0. |
| _txnCount.compareAndSet(0,1); |
| } |
| } |
| |
| private void decrementOutstandingTxnsIfNecessary() |
| { |
| if(isTransactional()) |
| { |
| //There can currently only be at most one outstanding transaction |
| //due to only having LocalTransaction support. Set value to 0 if 1. |
| _txnCount.compareAndSet(1,0); |
| } |
| } |
| |
| public Long getTxnStarts() |
| { |
| return _txnStarts.get(); |
| } |
| |
| public Long getTxnCommits() |
| { |
| return _txnCommits.get(); |
| } |
| |
| public Long getTxnRejects() |
| { |
| return _txnRejects.get(); |
| } |
| |
| public Long getTxnCount() |
| { |
| return _txnCount.get(); |
| } |
| |
| public int getChannelId() |
| { |
| return _channelId; |
| } |
| |
| public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException |
| { |
| String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString(); |
| if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), routingKey, e.getName())) |
| { |
| throw new AMQSecurityException("Permission denied: " + e.getName()); |
| } |
| _currentMessage = new IncomingMessage(info, getProtocolSession().getReference()); |
| _currentMessage.setExchange(e); |
| } |
| |
| public void publishContentHeader(ContentHeaderBody contentHeaderBody) |
| throws AMQException |
| { |
| if (_currentMessage == null) |
| { |
| throw new AMQException("Received content header without previously receiving a BasicPublish frame"); |
| } |
| else |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Content header received on channel " + _channelId); |
| } |
| |
| _currentMessage.setContentHeaderBody(contentHeaderBody); |
| |
| _currentMessage.setExpiration(); |
| |
| _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime()); |
| |
| _currentMessage.route(); |
| |
| deliverCurrentMessageIfComplete(); |
| } |
| } |
| |
| private void deliverCurrentMessageIfComplete() |
| throws AMQException |
| { |
| // check and deliver if header says body length is zero |
| if (_currentMessage.allContentReceived()) |
| { |
| try |
| { |
| final List<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues(); |
| |
| if(!checkMessageUserId(_currentMessage.getContentHeader())) |
| { |
| _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage)); |
| } |
| else |
| { |
| if(destinationQueues == null || destinationQueues.isEmpty()) |
| { |
| if (_currentMessage.isMandatory() || _currentMessage.isImmediate()) |
| { |
| _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", _currentMessage)); |
| } |
| else |
| { |
| _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey())); |
| } |
| } |
| else |
| { |
| final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(_currentMessage.getMessageMetaData()); |
| _currentMessage.setStoredMessage(handle); |
| int bodyCount = _currentMessage.getBodyCount(); |
| if(bodyCount > 0) |
| { |
| long bodyLengthReceived = 0; |
| for(int i = 0 ; i < bodyCount ; i++) |
| { |
| ContentChunk contentChunk = _currentMessage.getContentChunk(i); |
| handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData())); |
| bodyLengthReceived += contentChunk.getSize(); |
| } |
| } |
| |
| _transaction.addPostTransactionAction(new ServerTransaction.Action() |
| { |
| public void postCommit() |
| { |
| } |
| |
| public void onRollback() |
| { |
| handle.remove(); |
| } |
| }); |
| |
| _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime()); |
| incrementOutstandingTxnsIfNecessary(); |
| updateTransactionalActivity(); |
| _currentMessage.getStoredMessage().flushToStore(); |
| } |
| } |
| } |
| finally |
| { |
| long bodySize = _currentMessage.getSize(); |
| long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp(); |
| _session.registerMessageReceived(bodySize, timestamp); |
| _currentMessage = null; |
| } |
| } |
| |
| } |
| |
| public void publishContentBody(ContentBody contentBody) throws AMQException |
| { |
| if (_currentMessage == null) |
| { |
| throw new AMQException("Received content body without previously receiving a JmsPublishBody"); |
| } |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug(debugIdentity() + "Content body received on channel " + _channelId); |
| } |
| |
| try |
| { |
| final ContentChunk contentChunk = |
| _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody); |
| |
| _currentMessage.addContentBodyFrame(contentChunk); |
| |
| deliverCurrentMessageIfComplete(); |
| } |
| catch (AMQException e) |
| { |
| // we want to make sure we don't keep a reference to the message in the |
| // event of an error |
| _currentMessage = null; |
| throw e; |
| } |
| catch (RuntimeException e) |
| { |
| // we want to make sure we don't keep a reference to the message in the |
| // event of an error |
| _currentMessage = null; |
| throw e; |
| } |
| } |
| |
| public long getNextDeliveryTag() |
| { |
| return ++_deliveryTag; |
| } |
| |
| public int getNextConsumerTag() |
| { |
| return ++_consumerTag; |
| } |
| |
| |
| public Subscription getSubscription(AMQShortString subscription) |
| { |
| return _tag2SubscriptionMap.get(subscription); |
| } |
| |
| /** |
| * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean |
| * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. |
| * |
| * @param tag the tag chosen by the client (if null, server will generate one) |
| * @param queue the queue to subscribe to |
| * @param acks Are acks enabled for this subscriber |
| * @param filters Filters to apply to this subscriber |
| * |
| * @param noLocal Flag stopping own messages being receivied. |
| * @param exclusive Flag requesting exclusive access to the queue |
| * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests |
| * |
| * @throws AMQException if something goes wrong |
| */ |
| public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, |
| FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException |
| { |
| if (tag == null) |
| { |
| tag = new AMQShortString("sgen_" + getNextConsumerTag()); |
| } |
| |
| if (_tag2SubscriptionMap.containsKey(tag)) |
| { |
| throw new AMQException("Consumer already exists with same tag: " + tag); |
| } |
| |
| Subscription subscription = |
| SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); |
| |
| |
| // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. |
| // We add before we register as the Async Delivery process may AutoClose the subscriber |
| // so calling _cT2QM.remove before we have done put which was after the register succeeded. |
| // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. |
| |
| _tag2SubscriptionMap.put(tag, subscription); |
| |
| try |
| { |
| queue.registerSubscription(subscription, exclusive); |
| } |
| catch (AMQException e) |
| { |
| _tag2SubscriptionMap.remove(tag); |
| throw e; |
| } |
| catch (RuntimeException e) |
| { |
| _tag2SubscriptionMap.remove(tag); |
| throw e; |
| } |
| return tag; |
| } |
| |
| /** |
| * Unsubscribe a consumer from a queue. |
| * @param consumerTag |
| * @return true if the consumerTag had a mapped queue that could be unregistered. |
| * @throws AMQException |
| */ |
| public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException |
| { |
| |
| Subscription sub = _tag2SubscriptionMap.remove(consumerTag); |
| if (sub != null) |
| { |
| try |
| { |
| sub.getSendLock(); |
| sub.getQueue().unregisterSubscription(sub); |
| } |
| finally |
| { |
| sub.releaseSendLock(); |
| } |
| return true; |
| } |
| else |
| { |
| _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered."); |
| } |
| return false; |
| } |
| |
| /** |
| * Called from the protocol session to close this channel and clean up. T |
| * |
| * @throws AMQException if there is an error during closure |
| */ |
| public void close() throws AMQException |
| { |
| if(!_closing.compareAndSet(false, true)) |
| { |
| //Channel is already closing |
| return; |
| } |
| |
| CurrentActor.get().message(_logSubject, ChannelMessages.CLOSE()); |
| |
| unsubscribeAllConsumers(); |
| _transaction.rollback(); |
| |
| try |
| { |
| requeue(); |
| } |
| catch (AMQException e) |
| { |
| _logger.error("Caught AMQException whilst attempting to requeue:" + e); |
| } |
| catch (TransportException e) |
| { |
| _logger.error("Caught TransportException whilst attempting to requeue:" + e); |
| } |
| |
| getConfigStore().removeConfiguredObject(this); |
| |
| } |
| |
| private void unsubscribeAllConsumers() throws AMQException |
| { |
| if (_logger.isInfoEnabled()) |
| { |
| if (!_tag2SubscriptionMap.isEmpty()) |
| { |
| _logger.info("Unsubscribing all consumers on channel " + toString()); |
| } |
| else |
| { |
| _logger.info("No consumers to unsubscribe on channel " + toString()); |
| } |
| } |
| |
| for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet()) |
| { |
| if (_logger.isInfoEnabled()) |
| { |
| _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); |
| } |
| |
| Subscription sub = me.getValue(); |
| |
| try |
| { |
| sub.getSendLock(); |
| sub.getQueue().unregisterSubscription(sub); |
| } |
| finally |
| { |
| sub.releaseSendLock(); |
| } |
| |
| } |
| |
| _tag2SubscriptionMap.clear(); |
| } |
| |
| /** |
| * Add a message to the channel-based list of unacknowledged messages |
| * |
| * @param entry the record of the message on the queue that was delivered |
| * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the |
| * delivery tag) |
| * @param subscription The consumer that is to acknowledge this message. |
| */ |
| public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| if (entry.getQueue() == null) |
| { |
| _logger.debug("Adding unacked message with a null queue:" + entry); |
| } |
| else |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag |
| + ") with a queue(" + entry.getQueue() + ") for " + subscription); |
| } |
| } |
| } |
| |
| _unacknowledgedMessageMap.add(deliveryTag, entry); |
| |
| } |
| |
| private final String id = "(" + System.identityHashCode(this) + ")"; |
| |
| public String debugIdentity() |
| { |
| return _channelId + id; |
| } |
| |
| /** |
| * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to |
| * this same channel or to other subscribers. |
| * |
| * @throws org.apache.qpid.AMQException if the requeue fails |
| */ |
| public void requeue() throws AMQException |
| { |
| // we must create a new map since all the messages will get a new delivery tag when they are redelivered |
| Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); |
| |
| if (!messagesToBeDelivered.isEmpty()) |
| { |
| if (_logger.isInfoEnabled()) |
| { |
| _logger.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString()); |
| } |
| |
| } |
| |
| for (QueueEntry unacked : messagesToBeDelivered) |
| { |
| if (!unacked.isQueueDeleted()) |
| { |
| // Mark message redelivered |
| unacked.setRedelivered(); |
| |
| // Ensure message is released for redelivery |
| unacked.release(); |
| |
| } |
| else |
| { |
| unacked.discard(); |
| } |
| } |
| |
| } |
| |
| /** |
| * Requeue a single message |
| * |
| * @param deliveryTag The message to requeue |
| * |
| * @throws AMQException If something goes wrong. |
| */ |
| public void requeue(long deliveryTag) throws AMQException |
| { |
| QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag); |
| |
| if (unacked != null) |
| { |
| // Mark message redelivered |
| unacked.setRedelivered(); |
| |
| // Ensure message is released for redelivery |
| if (!unacked.isQueueDeleted()) |
| { |
| |
| // Ensure message is released for redelivery |
| unacked.release(); |
| |
| } |
| else |
| { |
| _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked |
| + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); |
| |
| unacked.discard(); |
| } |
| } |
| else |
| { |
| _logger.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." |
| + _unacknowledgedMessageMap.size()); |
| |
| } |
| |
| } |
| |
| public boolean isMaxDeliveryCountEnabled(final long deliveryTag) |
| { |
| final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); |
| if (queueEntry != null) |
| { |
| final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); |
| return maximumDeliveryCount > 0; |
| } |
| |
| return false; |
| } |
| |
| public boolean isDeliveredTooManyTimes(final long deliveryTag) |
| { |
| final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); |
| if (queueEntry != null) |
| { |
| final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); |
| final int numDeliveries = queueEntry.getDeliveryCount(); |
| return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Called to resend all outstanding unacknowledged messages to this same channel. |
| * |
| * @param requeue Are the messages to be requeued or dropped. |
| * |
| * @throws AMQException When something goes wrong. |
| */ |
| public void resend(final boolean requeue) throws AMQException |
| { |
| |
| |
| final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); |
| final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("unacked map Size:" + _unacknowledgedMessageMap.size()); |
| } |
| |
| // Process the Unacked-Map. |
| // Marking messages who still have a consumer for to be resent |
| // and those that don't to be requeued. |
| _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, |
| msgToRequeue, |
| msgToResend, |
| requeue, |
| _messageStore)); |
| |
| |
| // Process Messages to Resend |
| if (_logger.isDebugEnabled()) |
| { |
| if (!msgToResend.isEmpty()) |
| { |
| _logger.debug("Preparing (" + msgToResend.size() + ") message to resend."); |
| } |
| else |
| { |
| _logger.debug("No message to resend."); |
| } |
| } |
| |
| for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet()) |
| { |
| QueueEntry message = entry.getValue(); |
| long deliveryTag = entry.getKey(); |
| |
| //Amend the delivery counter as the client hasn't seen these messages yet. |
| message.decrementDeliveryCount(); |
| |
| AMQQueue queue = message.getQueue(); |
| |
| // Without any details from the client about what has been processed we have to mark |
| // all messages in the unacked map as redelivered. |
| message.setRedelivered(); |
| |
| Subscription sub = message.getDeliveredSubscription(); |
| |
| if (sub != null) |
| { |
| |
| if(!queue.resend(message,sub)) |
| { |
| msgToRequeue.put(deliveryTag, message); |
| } |
| } |
| else |
| { |
| |
| if (_logger.isInfoEnabled()) |
| { |
| _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() |
| + ")to prevent loss"); |
| } |
| // move this message to requeue |
| msgToRequeue.put(deliveryTag, message); |
| } |
| } // for all messages |
| // } else !isSuspend |
| |
| if (_logger.isInfoEnabled()) |
| { |
| if (!msgToRequeue.isEmpty()) |
| { |
| _logger.info("Preparing (" + msgToRequeue.size() + ") message to requeue to."); |
| } |
| } |
| |
| // Process Messages to Requeue at the front of the queue |
| for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet()) |
| { |
| QueueEntry message = entry.getValue(); |
| long deliveryTag = entry.getKey(); |
| |
| //Amend the delivery counter as the client hasn't seen these messages yet. |
| message.decrementDeliveryCount(); |
| |
| _unacknowledgedMessageMap.remove(deliveryTag); |
| |
| message.setRedelivered(); |
| message.release(); |
| |
| } |
| } |
| |
| |
| /** |
| * Acknowledge one or more messages. |
| * |
| * @param deliveryTag the last delivery tag |
| * @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only |
| * acknowledges the single message specified by the delivery tag |
| * |
| * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel |
| */ |
| public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException |
| { |
| Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple); |
| _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); |
| updateTransactionalActivity(); |
| } |
| |
| private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) |
| { |
| |
| return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple); |
| |
| } |
| |
| /** |
| * Used only for testing purposes. |
| * |
| * @return the map of unacknowledged messages |
| */ |
| public UnacknowledgedMessageMap getUnacknowledgedMessageMap() |
| { |
| return _unacknowledgedMessageMap; |
| } |
| |
| /** |
| * Called from the ChannelFlowHandler to suspend this Channel |
| * @param suspended boolean, should this Channel be suspended |
| */ |
| public void setSuspended(boolean suspended) |
| { |
| boolean wasSuspended = _suspended.getAndSet(suspended); |
| if (wasSuspended != suspended) |
| { |
| // Log Flow Started before we start the subscriptions |
| if (!suspended) |
| { |
| _actor.message(_logSubject, ChannelMessages.FLOW("Started")); |
| } |
| |
| |
| // This section takes two different approaches to perform to perform |
| // the same function. Ensuring that the Subscription has taken note |
| // of the change in Channel State |
| |
| // Here we have become unsuspended and so we ask each the queue to |
| // perform an Async delivery for each of the subscriptions in this |
| // Channel. The alternative would be to ensure that the subscription |
| // had received the change in suspension state. That way the logic |
| // behind decieding to start an async delivery was located with the |
| // Subscription. |
| if (wasSuspended) |
| { |
| // may need to deliver queued messages |
| for (Subscription s : _tag2SubscriptionMap.values()) |
| { |
| s.getQueue().deliverAsync(s); |
| } |
| } |
| |
| |
| // Here we have become suspended so we need to ensure that each of |
| // the Subscriptions has noticed this change so that we can be sure |
| // they are not still sending messages. Again the code here is a |
| // very simplistic approach to ensure that the change of suspension |
| // has been noticed by each of the Subscriptions. Unlike the above |
| // case we don't actually need to do anything else. |
| if (!wasSuspended) |
| { |
| // may need to deliver queued messages |
| for (Subscription s : _tag2SubscriptionMap.values()) |
| { |
| try |
| { |
| s.getSendLock(); |
| } |
| finally |
| { |
| s.releaseSendLock(); |
| } |
| } |
| } |
| |
| |
| // Log Suspension only after we have confirmed all suspensions are |
| // stopped. |
| if (suspended) |
| { |
| _actor.message(_logSubject, ChannelMessages.FLOW("Stopped")); |
| } |
| |
| } |
| } |
| |
| public boolean isSuspended() |
| { |
| return _suspended.get() || _closing.get() || _session.isClosing(); |
| } |
| |
| public void commit() throws AMQException |
| { |
| commit(null); |
| } |
| public void commit(Runnable immediateAction) throws AMQException |
| { |
| |
| if (!isTransactional()) |
| { |
| throw new AMQException("Fatal error: commit called on non-transactional channel"); |
| } |
| |
| _transaction.commit(immediateAction); |
| |
| _txnCommits.incrementAndGet(); |
| _txnStarts.incrementAndGet(); |
| decrementOutstandingTxnsIfNecessary(); |
| } |
| |
| public void rollback() throws AMQException |
| { |
| rollback(NULL_TASK); |
| } |
| |
| public void rollback(Runnable postRollbackTask) throws AMQException |
| { |
| if (!isTransactional()) |
| { |
| throw new AMQException("Fatal error: commit called on non-transactional channel"); |
| } |
| |
| // stop all subscriptions |
| _rollingBack = true; |
| boolean requiresSuspend = _suspended.compareAndSet(false,true); |
| |
| // ensure all subscriptions have seen the change to the channel state |
| for(Subscription sub : _tag2SubscriptionMap.values()) |
| { |
| sub.getSendLock(); |
| sub.releaseSendLock(); |
| } |
| |
| try |
| { |
| _transaction.rollback(); |
| } |
| finally |
| { |
| _rollingBack = false; |
| |
| _txnRejects.incrementAndGet(); |
| _txnStarts.incrementAndGet(); |
| decrementOutstandingTxnsIfNecessary(); |
| } |
| |
| postRollbackTask.run(); |
| |
| for(QueueEntry entry : _resendList) |
| { |
| Subscription sub = entry.getDeliveredSubscription(); |
| if(sub == null || sub.isClosed()) |
| { |
| entry.release(); |
| } |
| else |
| { |
| sub.getQueue().resend(entry, sub); |
| } |
| } |
| _resendList.clear(); |
| |
| if(requiresSuspend) |
| { |
| _suspended.set(false); |
| for(Subscription sub : _tag2SubscriptionMap.values()) |
| { |
| sub.getQueue().deliverAsync(sub); |
| } |
| |
| } |
| |
| |
| } |
| |
| /** |
| * Update last transaction activity timestamp |
| */ |
| private void updateTransactionalActivity() |
| { |
| if (isTransactional()) |
| { |
| _txnUpdateTime.set(getProtocolSession().getLastReceivedTime()); |
| } |
| } |
| |
| public String toString() |
| { |
| return "["+_session.toString()+":"+_channelId+"]"; |
| } |
| |
| public void setDefaultQueue(AMQQueue queue) |
| { |
| _defaultQueue = queue; |
| } |
| |
| public AMQQueue getDefaultQueue() |
| { |
| return _defaultQueue; |
| } |
| |
| |
| public boolean isClosing() |
| { |
| return _closing.get(); |
| } |
| |
| public AMQProtocolSession getProtocolSession() |
| { |
| return _session; |
| } |
| |
| public FlowCreditManager getCreditManager() |
| { |
| return _creditManager; |
| } |
| |
| public void setCredit(final long prefetchSize, final int prefetchCount) |
| { |
| _actor.message(ChannelMessages.PREFETCH_SIZE(prefetchSize, prefetchCount)); |
| _creditManager.setCreditLimits(prefetchSize, prefetchCount); |
| } |
| |
| public MessageStore getMessageStore() |
| { |
| return _messageStore; |
| } |
| |
| public ClientDeliveryMethod getClientDeliveryMethod() |
| { |
| return _clientDeliveryMethod; |
| } |
| |
| private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() |
| { |
| |
| public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) |
| { |
| addUnacknowledgedMessage(entry, deliveryTag, sub); |
| } |
| }; |
| |
| public RecordDeliveryMethod getRecordDeliveryMethod() |
| { |
| return _recordDeliveryMethod; |
| } |
| |
| |
| private AMQMessage createAMQMessage(IncomingMessage incomingMessage) |
| throws AMQException |
| { |
| |
| AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage()); |
| |
| message.setExpiration(incomingMessage.getExpiration()); |
| message.setConnectionIdentifier(_session.getReference()); |
| return message; |
| } |
| |
| private boolean checkMessageUserId(ContentHeaderBody header) |
| { |
| AMQShortString userID = |
| header.getProperties() instanceof BasicContentHeaderProperties |
| ? ((BasicContentHeaderProperties) header.getProperties()).getUserId() |
| : null; |
| |
| return (!MSG_AUTH || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); |
| |
| } |
| |
| public Object getID() |
| { |
| return _channelId; |
| } |
| |
| public AMQConnectionModel getConnectionModel() |
| { |
| return _session; |
| } |
| |
| public String getClientID() |
| { |
| return String.valueOf(_session.getContextKey()); |
| } |
| |
| public LogSubject getLogSubject() |
| { |
| return _logSubject; |
| } |
| |
| private class MessageDeliveryAction implements ServerTransaction.Action |
| { |
| private IncomingMessage _incommingMessage; |
| private List<? extends BaseQueue> _destinationQueues; |
| |
| public MessageDeliveryAction(IncomingMessage currentMessage, |
| List<? extends BaseQueue> destinationQueues) |
| { |
| _incommingMessage = currentMessage; |
| _destinationQueues = destinationQueues; |
| } |
| |
| public void postCommit() |
| { |
| try |
| { |
| final boolean immediate = _incommingMessage.isImmediate(); |
| |
| final AMQMessage amqMessage = createAMQMessage(_incommingMessage); |
| MessageReference ref = amqMessage.newReference(); |
| |
| for(int i = 0; i < _destinationQueues.size(); i++) |
| { |
| BaseQueue queue = _destinationQueues.get(i); |
| |
| BaseQueue.PostEnqueueAction action; |
| |
| if(immediate) |
| { |
| action = new ImmediateAction(queue); |
| } |
| else |
| { |
| action = null; |
| } |
| |
| queue.enqueue(amqMessage, isTransactional(), action); |
| |
| if(queue instanceof AMQQueue) |
| { |
| ((AMQQueue)queue).checkCapacity(AMQChannel.this); |
| } |
| |
| } |
| |
| _incommingMessage.getStoredMessage().flushToStore(); |
| ref.release(); |
| } |
| catch (AMQException e) |
| { |
| // TODO |
| throw new RuntimeException(e); |
| } |
| |
| |
| |
| |
| |
| } |
| |
| public void onRollback() |
| { |
| // Maybe keep track of entries that were created and then delete them here in case of failure |
| // to in memory enqueue |
| } |
| |
| private class ImmediateAction implements BaseQueue.PostEnqueueAction |
| { |
| private final BaseQueue _queue; |
| |
| public ImmediateAction(BaseQueue queue) |
| { |
| _queue = queue; |
| } |
| |
| public void onEnqueue(QueueEntry entry) |
| { |
| if (!entry.getDeliveredToConsumer() && entry.acquire()) |
| { |
| |
| |
| ServerTransaction txn = new LocalTransaction(_messageStore); |
| Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); |
| entries.add(entry); |
| final AMQMessage message = (AMQMessage) entry.getMessage(); |
| txn.dequeue(_queue, entry.getMessage(), |
| new MessageAcknowledgeAction(entries) |
| { |
| @Override |
| public void postCommit() |
| { |
| try |
| { |
| final |
| ProtocolOutputConverter outputConverter = |
| _session.getProtocolOutputConverter(); |
| |
| outputConverter.writeReturn(message.getMessagePublishInfo(), |
| message.getContentHeaderBody(), |
| message, |
| _channelId, |
| AMQConstant.NO_CONSUMERS.getCode(), |
| IMMEDIATE_DELIVERY_REPLY_TEXT); |
| } |
| catch (AMQException e) |
| { |
| throw new RuntimeException(e); |
| } |
| super.postCommit(); |
| } |
| } |
| ); |
| txn.commit(); |
| |
| |
| } |
| |
| } |
| } |
| } |
| |
| private class MessageAcknowledgeAction implements ServerTransaction.Action |
| { |
| private final Collection<QueueEntry> _ackedMessages; |
| |
| public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages) |
| { |
| _ackedMessages = ackedMessages; |
| } |
| |
| public void postCommit() |
| { |
| try |
| { |
| for(QueueEntry entry : _ackedMessages) |
| { |
| entry.discard(); |
| } |
| } |
| finally |
| { |
| _acknowledgedMessages.clear(); |
| } |
| |
| } |
| |
| public void onRollback() |
| { |
| // explicit rollbacks resend the message after the rollback-ok is sent |
| if(_rollingBack) |
| { |
| _resendList.addAll(_ackedMessages); |
| } |
| else |
| { |
| try |
| { |
| for(QueueEntry entry : _ackedMessages) |
| { |
| entry.release(); |
| } |
| } |
| finally |
| { |
| _acknowledgedMessages.clear(); |
| } |
| } |
| |
| } |
| } |
| |
| private class WriteReturnAction implements ServerTransaction.Action |
| { |
| private final AMQConstant _errorCode; |
| private final IncomingMessage _message; |
| private final String _description; |
| |
| public WriteReturnAction(AMQConstant errorCode, |
| String description, |
| IncomingMessage message) |
| { |
| _errorCode = errorCode; |
| _message = message; |
| _description = description; |
| } |
| |
| public void postCommit() |
| { |
| try |
| { |
| _session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(), |
| _message.getContentHeader(), |
| _message, |
| _channelId, |
| _errorCode.getCode(), |
| new AMQShortString(_description)); |
| } |
| catch (AMQException e) |
| { |
| //TODO |
| throw new RuntimeException(e); |
| } |
| |
| } |
| |
| public void onRollback() |
| { |
| //To change body of implemented methods use File | Settings | File Templates. |
| } |
| } |
| |
| |
| public LogActor getLogActor() |
| { |
| return _actor; |
| } |
| |
| public void block(AMQQueue queue) |
| { |
| if(_blockingQueues.add(queue)) |
| { |
| |
| if(_blocking.compareAndSet(false,true)) |
| { |
| _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString())); |
| flow(false); |
| } |
| } |
| } |
| |
| public void unblock(AMQQueue queue) |
| { |
| if(_blockingQueues.remove(queue)) |
| { |
| if(_blocking.compareAndSet(true,false)) |
| { |
| _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); |
| |
| flow(true); |
| } |
| } |
| } |
| |
| public boolean onSameConnection(InboundMessage inbound) |
| { |
| if(inbound instanceof IncomingMessage) |
| { |
| IncomingMessage incoming = (IncomingMessage) inbound; |
| return getProtocolSession().getReference() == incoming.getConnectionReference(); |
| } |
| return false; |
| } |
| |
| private void flow(boolean flow) |
| { |
| MethodRegistry methodRegistry = _session.getMethodRegistry(); |
| AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); |
| _session.writeFrame(responseBody.generateFrame(_channelId)); |
| } |
| |
| public boolean getBlocking() |
| { |
| return _blocking.get(); |
| } |
| |
| public VirtualHost getVirtualHost() |
| { |
| return getProtocolSession().getVirtualHost(); |
| } |
| |
| |
| public ConfiguredObject getParent() |
| { |
| return getVirtualHost(); |
| } |
| |
| public SessionConfigType getConfigType() |
| { |
| return SessionConfigType.getInstance(); |
| } |
| |
| public int getChannel() |
| { |
| return getChannelId(); |
| } |
| |
| public boolean isAttached() |
| { |
| return true; |
| } |
| |
| public long getDetachedLifespan() |
| { |
| return 0; |
| } |
| |
| public ConnectionConfig getConnectionConfig() |
| { |
| return (AMQProtocolEngine)getProtocolSession(); |
| } |
| |
| public Long getExpiryTime() |
| { |
| return null; |
| } |
| |
| public Long getMaxClientRate() |
| { |
| return null; |
| } |
| |
| public boolean isDurable() |
| { |
| return false; |
| } |
| |
| public UUID getId() |
| { |
| return _id; |
| } |
| |
| public String getSessionName() |
| { |
| return getConnectionConfig().getAddress() + "/" + getChannelId(); |
| } |
| |
| public long getCreateTime() |
| { |
| return _createTime; |
| } |
| |
| public void mgmtClose() throws AMQException |
| { |
| _session.mgmtCloseChannel(_channelId); |
| } |
| |
| public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException |
| { |
| if (inTransaction()) |
| { |
| long currentTime = System.currentTimeMillis(); |
| long openTime = currentTime - _transaction.getTransactionStartTime(); |
| long idleTime = currentTime - _txnUpdateTime.get(); |
| |
| // Log a warning on idle or open transactions |
| if (idleWarn > 0L && idleTime > idleWarn) |
| { |
| CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime)); |
| _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms"); |
| } |
| else if (openWarn > 0L && openTime > openWarn) |
| { |
| CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime)); |
| _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); |
| } |
| |
| // Close connection for idle or open transactions that have timed out |
| if (idleClose > 0L && idleTime > idleClose) |
| { |
| getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); |
| } |
| else if (openClose > 0L && openTime > openClose) |
| { |
| getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); |
| } |
| } |
| } |
| |
| public void deadLetter(long deliveryTag) throws AMQException |
| { |
| final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); |
| final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag); |
| |
| if (rejectedQueueEntry == null) |
| { |
| _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag); |
| return; |
| } |
| else |
| { |
| final ServerMessage msg = rejectedQueueEntry.getMessage(); |
| |
| final AMQQueue queue = rejectedQueueEntry.getQueue(); |
| |
| final Exchange altExchange = queue.getAlternateExchange(); |
| unackedMap.remove(deliveryTag); |
| |
| if (altExchange == null) |
| { |
| _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); |
| _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); |
| rejectedQueueEntry.discard(); |
| return; |
| } |
| |
| final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry); |
| |
| final List<? extends BaseQueue> destinationQueues = altExchange.route(m); |
| |
| if (destinationQueues == null || destinationQueues.isEmpty()) |
| { |
| _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); |
| _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); |
| rejectedQueueEntry.discard(); |
| return; |
| } |
| |
| rejectedQueueEntry.routeToAlternate(); |
| |
| //output operational logging for each delivery post commit |
| for (final BaseQueue destinationQueue : destinationQueues) |
| { |
| _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString())); |
| } |
| |
| } |
| } |
| |
| public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action) |
| { |
| _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); |
| } |
| |
| public void completeAsyncCommands() |
| { |
| AsyncCommand cmd; |
| while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) |
| { |
| cmd.complete(); |
| _unfinishedCommandsQueue.poll(); |
| } |
| while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) |
| { |
| cmd = _unfinishedCommandsQueue.poll(); |
| cmd.awaitReadyForCompletion(); |
| cmd.complete(); |
| } |
| } |
| |
| |
| public void sync() |
| { |
| AsyncCommand cmd; |
| while((cmd = _unfinishedCommandsQueue.poll()) != null) |
| { |
| cmd.awaitReadyForCompletion(); |
| cmd.complete(); |
| } |
| } |
| |
| private static class AsyncCommand |
| { |
| private final MessageStore.StoreFuture _future; |
| private ServerTransaction.Action _action; |
| |
| public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action) |
| { |
| _future = future; |
| _action = action; |
| } |
| |
| void awaitReadyForCompletion() |
| { |
| _future.waitForCompletion(); |
| } |
| |
| void complete() |
| { |
| if(!_future.isComplete()) |
| { |
| _future.waitForCompletion(); |
| } |
| _action.postCommit(); |
| _action = null; |
| } |
| |
| boolean isReadyForCompletion() |
| { |
| return _future.isComplete(); |
| } |
| } |
| |
| public int compareTo(AMQSessionModel session) |
| { |
| return getId().toString().compareTo(session.getID().toString()); |
| } |
| } |