| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.client; |
| |
| import java.io.Serializable; |
| import java.lang.ref.WeakReference; |
| import java.net.URISyntaxException; |
| import java.text.MessageFormat; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.UUID; |
| import java.util.WeakHashMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import javax.jms.*; |
| import javax.jms.IllegalStateException; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.AMQChannelClosedException; |
| import org.apache.qpid.AMQDisconnectedException; |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.AMQInvalidArgumentException; |
| import org.apache.qpid.AMQInvalidRoutingKeyException; |
| import org.apache.qpid.QpidException; |
| import org.apache.qpid.client.AMQDestination.DestSyntax; |
| import org.apache.qpid.client.failover.FailoverException; |
| import org.apache.qpid.client.failover.FailoverNoopSupport; |
| import org.apache.qpid.client.failover.FailoverProtectedOperation; |
| import org.apache.qpid.client.failover.FailoverRetrySupport; |
| import org.apache.qpid.client.message.AMQMessageDelegateFactory; |
| import org.apache.qpid.client.message.AMQPEncodedListMessage; |
| import org.apache.qpid.client.message.AMQPEncodedMapMessage; |
| import org.apache.qpid.client.message.AbstractJMSMessage; |
| import org.apache.qpid.client.message.CloseConsumerMessage; |
| import org.apache.qpid.client.message.JMSBytesMessage; |
| import org.apache.qpid.client.message.JMSMapMessage; |
| import org.apache.qpid.client.message.JMSObjectMessage; |
| import org.apache.qpid.client.message.JMSStreamMessage; |
| import org.apache.qpid.client.message.JMSTextMessage; |
| import org.apache.qpid.client.message.MessageEncryptionHelper; |
| import org.apache.qpid.client.message.MessageFactoryRegistry; |
| import org.apache.qpid.client.message.UnprocessedMessage; |
| import org.apache.qpid.client.messaging.address.Link; |
| import org.apache.qpid.client.messaging.address.Node; |
| import org.apache.qpid.client.util.FlowControllingBlockingQueue; |
| import org.apache.qpid.client.util.JMSExceptionHelper; |
| import org.apache.qpid.common.AMQPFilterTypes; |
| import org.apache.qpid.configuration.ClientProperties; |
| import org.apache.qpid.exchange.ExchangeDefaults; |
| import org.apache.qpid.jms.ListMessage; |
| import org.apache.qpid.jms.Session; |
| import org.apache.qpid.protocol.ErrorCodes; |
| import org.apache.qpid.thread.Threading; |
| import org.apache.qpid.transport.SessionException; |
| import org.apache.qpid.transport.TransportException; |
| import org.apache.qpid.util.Strings; |
| |
| /* |
| * TODO Different FailoverSupport implementation are needed on the same method call, in different situations. For |
| * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second |
| * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of |
| * the fail-over process, the retry handler could be used to automatically retry the operation once the connection |
| * has been reestablished. All fail-over protected operations should be placed in private methods, with |
| * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the |
| * fail-over process sets a nowait flag and uses an async method call instead. |
| * TODO Two new objects created on every failover supported method call. Consider more efficient ways of doing this, |
| * after looking at worse bottlenecks first. |
| */ |
| public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession |
| { |
| /** Used for debugging. */ |
| private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); |
| |
| /** System property to configure dispatcher shutdown timeout in milliseconds. */ |
| public static final String DISPATCHER_SHUTDOWN_TIMEOUT_MS = "DISPATCHER_SHUTDOWN_TIMEOUT_MS"; |
| /** Dispatcher shutdown timeout default setting. */ |
| public static final String DISPATCHER_SHUTDOWN_TIMEOUT_MS_DEFAULT = "1000"; |
| |
| /** System property to enable strict AMQP compliance. */ |
| public static final String STRICT_AMQP = "STRICT_AMQP"; |
| |
| /** Strict AMQP default setting. */ |
| public static final String STRICT_AMQP_DEFAULT = "false"; |
| |
| /** System property to enable failure if strict AMQP compliance is violated. */ |
| public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL"; |
| |
| /** Strict AMQP failure default. */ |
| public static final String STRICT_AMQP_FATAL_DEFAULT = "true"; |
| |
| /** System property to enable immediate message prefetching. */ |
| public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; |
| |
| /** Immediate message prefetch default. */ |
| public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; |
| |
| private final boolean _declareQueues = |
| Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true")); |
| |
| private final boolean _declareExchanges = |
| Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true")); |
| |
| private final boolean _bindQueues = |
| Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME, "true")); |
| |
| private final boolean _useAMQPEncodedMapMessage; |
| |
| private final boolean _useAMQPEncodedStreamMessage; |
| |
| /** |
| * Flag indicating to start dispatcher as a daemon thread |
| */ |
| protected final boolean DAEMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER); |
| |
| private final Map<AMQDestination, WeakReference<AMQDestination>> |
| _resolvedDestinations = Collections.synchronizedMap(new WeakHashMap<AMQDestination, WeakReference<AMQDestination>> ()); |
| |
| private final long _dispatcherShutdownTimeoutMs; |
| |
| /** The connection to which this session belongs. */ |
| private AMQConnection _connection; |
| |
| /** Used to indicate whether or not this is a transactional session. */ |
| private final boolean _transacted; |
| |
| /** Holds the sessions acknowledgement mode. */ |
| private final int _acknowledgeMode; |
| |
| /** Holds this session unique identifier, used to distinguish it from other sessions. */ |
| private int _channelId; |
| |
| private int _ticket; |
| |
| /** Holds the high mark for prefetched message, at which the session is suspended. */ |
| private final int _prefetchHighMark; |
| |
| /** Holds the low mark for prefetched messages, below which the session is resumed. */ |
| private final int _prefetchLowMark; |
| |
| /** Holds the message listener, if any, which is attached to this session. */ |
| private MessageListener _messageListener = null; |
| |
| /** Used to indicate that this session has been started at least once. */ |
| private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); |
| |
| private final ConcurrentMap<String, TopicSubscriberAdaptor<C>> _subscriptions = |
| new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>(); |
| |
| private final ConcurrentMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>(); |
| |
| private final Lock _subscriberDetails = new ReentrantLock(true); |
| private final Lock _subscriberAccess = new ReentrantLock(true); |
| |
| private final FlowControllingBlockingQueue<Dispatchable> _queue; |
| |
| private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); |
| private final AtomicLong _rollbackMark = new AtomicLong(-1); |
| |
| private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>(); |
| |
| private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); |
| |
| private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); |
| |
| private volatile Dispatcher _dispatcher; |
| |
| private volatile Thread _dispatcherThread; |
| |
| private MessageFactoryRegistry _messageFactoryRegistry; |
| |
| /** Holds all of the producers created by this session, keyed by their unique identifiers. */ |
| private final Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); |
| |
| /** |
| * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume |
| * methods. |
| */ |
| private int _nextTag = 1; |
| |
| private final Map<String,C> _consumers = new ConcurrentHashMap<>(); |
| |
| /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ |
| private ConcurrentMap<Destination, AtomicInteger> _destinationConsumerCount = |
| new ConcurrentHashMap<Destination, AtomicInteger>(); |
| |
| /** |
| * Used as a source of unique identifiers for producers within the session. |
| * |
| * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one |
| * thread of control is allowed to create producers for any given session instance. |
| */ |
| private long _nextProducerId; |
| |
| /** |
| * Set when recover is called. This is to handle the case where recover() is called by application code during |
| * onMessage() processing to ensure that an auto ack is not sent. |
| */ |
| private volatile boolean _sessionInRecovery; |
| |
| private volatile boolean _usingDispatcherForCleanup; |
| |
| /** Used to indicates that the connection to which this session belongs, has been stopped. */ |
| private final AtomicBoolean _connectionStopped = new AtomicBoolean(); |
| |
| /** Used to indicate that this session has a message listener attached to it. */ |
| private boolean _hasMessageListeners; |
| |
| /** Used to indicate that this session has been suspended. */ |
| private boolean _suspended; |
| |
| /** |
| * Used to protect the suspension of this session, so that critical code can be executed during suspension, |
| * without the session being resumed by other threads. |
| */ |
| private final Object _suspensionLock = new Object(); |
| |
| private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); |
| |
| private final boolean _immediatePrefetch; |
| |
| private final boolean _strictAMQP; |
| |
| private final boolean _strictAMQPFATAL; |
| private final Lock _messageDeliveryLock = new ReentrantLock(true); |
| |
| /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */ |
| private boolean _dirty; |
| /** Has failover occured on this session with outstanding actions to commit? */ |
| private boolean _failedOverDirty; |
| |
| private MessageEncryptionHelper _messageEncryptionHelper; |
| |
| /** Holds the highest received delivery tag. */ |
| protected AtomicLong getHighestDeliveryTag() |
| { |
| return _highestDeliveryTag; |
| } |
| |
| /** Pre-fetched message tags */ |
| protected ConcurrentLinkedQueue<Long> getPrefetchedMessageTags() |
| { |
| return _prefetchedMessageTags; |
| } |
| |
| /** All the not yet acknowledged message tags */ |
| protected ConcurrentLinkedQueue<Long> getUnacknowledgedMessageTags() |
| { |
| return _unacknowledgedMessageTags; |
| } |
| |
| /** All the delivered message tags */ |
| protected ConcurrentLinkedQueue<Long> getDeliveredMessageTags() |
| { |
| return _deliveredMessageTags; |
| } |
| |
| /** Holds the dispatcher thread for this session. */ |
| protected Dispatcher getDispatcher() |
| { |
| return _dispatcher; |
| } |
| |
| protected Thread getDispatcherThread() |
| { |
| return _dispatcherThread; |
| } |
| |
| /** Holds the message factory factory for this session. */ |
| protected MessageFactoryRegistry getMessageFactoryRegistry() |
| { |
| return _messageFactoryRegistry; |
| } |
| |
| private final ExecutorService _flowControlNoAckTaskPool; |
| |
| /** |
| * Consumers associated with this session |
| */ |
| protected Collection<C> getConsumers() |
| { |
| return new ArrayList<>(_consumers.values()); |
| } |
| |
| protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) |
| { |
| _usingDispatcherForCleanup = usingDispatcherForCleanup; |
| } |
| |
| /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */ |
| protected boolean isImmediatePrefetch() |
| { |
| return _immediatePrefetch; |
| } |
| |
| abstract void handleNodeDelete(final AMQDestination dest) throws QpidException; |
| |
| abstract void handleLinkDelete(final AMQDestination dest) throws QpidException; |
| |
| /** |
| * Creates a new session on a connection. |
| * @param con The connection on which to create the session. |
| * @param channelId The unique identifier for the session. |
| * @param transacted Indicates whether or not the session is transactional. |
| * @param acknowledgeMode The acknowledgement mode for the session. |
| * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. |
| * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. |
| */ |
| protected AMQSession(AMQConnection con, |
| int channelId, |
| boolean transacted, |
| int acknowledgeMode, |
| int defaultPrefetchHighMark, |
| int defaultPrefetchLowMark) |
| { |
| _useAMQPEncodedMapMessage = con == null || !con.isUseLegacyMapMessageFormat(); |
| _useAMQPEncodedStreamMessage = con != null && !con.isUseLegacyStreamMessageFormat(); |
| _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); |
| _strictAMQPFATAL = |
| Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); |
| _immediatePrefetch = |
| _strictAMQP |
| || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)); |
| _dispatcherShutdownTimeoutMs = Integer.parseInt(System.getProperty(DISPATCHER_SHUTDOWN_TIMEOUT_MS, DISPATCHER_SHUTDOWN_TIMEOUT_MS_DEFAULT)); |
| |
| _connection = con; |
| _transacted = transacted; |
| if (transacted) |
| { |
| _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED; |
| } |
| else |
| { |
| _acknowledgeMode = acknowledgeMode; |
| } |
| _messageEncryptionHelper = new MessageEncryptionHelper(this); |
| _channelId = channelId; |
| _messageFactoryRegistry = MessageFactoryRegistry.newDefaultRegistry(this); |
| |
| if (_acknowledgeMode == NO_ACKNOWLEDGE) |
| { |
| _prefetchHighMark = defaultPrefetchHighMark; |
| _prefetchLowMark = defaultPrefetchLowMark == defaultPrefetchHighMark && defaultPrefetchHighMark > 0 |
| ? Math.max(defaultPrefetchHighMark / 2, 1) |
| : defaultPrefetchLowMark; |
| |
| // we coalesce suspend jobs using single threaded pool executor with queue length of one |
| // and discarding policy |
| _flowControlNoAckTaskPool = new ThreadPoolExecutor(1, 1, |
| 0L, TimeUnit.MILLISECONDS, |
| new LinkedBlockingQueue<Runnable>(1), |
| new ThreadFactory() |
| { |
| @Override |
| public Thread newThread(final Runnable r) |
| { |
| Thread thread = new Thread(r, "Connection_" + _connection.getConnectionNumber() + "_session_" + _channelId); |
| if (!thread.isDaemon()) |
| { |
| thread.setDaemon(true); |
| } |
| |
| return thread; |
| } |
| }, new ThreadPoolExecutor.DiscardPolicy()); |
| |
| final FlowControllingBlockingQueue.ThresholdListener listener = |
| new FlowControllingBlockingQueue.ThresholdListener() |
| { |
| private final AtomicBoolean _suspendState = new AtomicBoolean(); |
| |
| public void aboveThreshold(int currentValue) |
| { |
| if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing())) |
| { |
| // Only execute change if previous state was false |
| if (!_suspendState.getAndSet(true)) |
| { |
| _logger.debug( |
| "Above threshold ({}) so suspending channel. Current value is {}", |
| _prefetchHighMark, |
| currentValue); |
| |
| doSuspend(); |
| } |
| } |
| } |
| |
| public void underThreshold(int currentValue) |
| { |
| if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing())) |
| { |
| // Only execute change if previous state was true |
| if (_suspendState.getAndSet(false)) |
| { |
| _logger.debug( |
| "Below threshold ({}) so unsuspending channel. Current value is {}", |
| _prefetchLowMark, |
| currentValue); |
| doSuspend(); |
| } |
| } |
| } |
| |
| private void doSuspend() |
| { |
| _flowControlNoAckTaskPool.execute(new SuspenderRunner(_suspendState)); |
| } |
| }; |
| _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, _prefetchLowMark, listener); |
| } |
| else |
| { |
| _prefetchHighMark = defaultPrefetchHighMark; |
| _prefetchLowMark = defaultPrefetchLowMark; |
| _flowControlNoAckTaskPool = null; |
| _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, null); |
| } |
| |
| // Add creation logging to tie in with the existing close logging |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Created session:" + this); |
| } |
| } |
| |
| // ===== JMS Session methods. |
| |
| /** |
| * Closes the session with no timeout. |
| * |
| * @throws JMSException If the JMS provider fails to close the session due to some internal error. |
| */ |
| @Override |
| public void close() throws JMSException |
| { |
| close(-1); |
| } |
| |
| public abstract QpidException getLastException(); |
| |
| public void checkNotClosed() throws JMSException |
| { |
| try |
| { |
| super.checkNotClosed(); |
| } |
| catch (IllegalStateException ise) |
| { |
| QpidException ex = getLastException(); |
| if (ex != null) |
| { |
| int code = 0; |
| |
| if (ex instanceof AMQException) |
| { |
| code = ((AMQException) ex).getErrorCode(); |
| } |
| |
| throw JMSExceptionHelper.chainJMSException(new IllegalStateException("Session has been closed", |
| code == 0 ? null : Integer.toString(code)), ex); |
| } |
| else |
| { |
| throw ise; |
| } |
| } |
| } |
| |
| public BytesMessage createBytesMessage() throws JMSException |
| { |
| checkNotClosed(); |
| JMSBytesMessage msg = new JMSBytesMessage(getMessageDelegateFactory()); |
| msg.setAMQSession(this); |
| return msg; |
| } |
| |
| /** |
| * Acknowledges all unacknowledged messages on the session, for all message consumers on the session. |
| * |
| * @throws IllegalStateException If the session is closed. |
| * @throws JMSException if there is a problem during acknowledge process. |
| */ |
| public void acknowledge() throws IllegalStateException, JMSException |
| { |
| if (isClosed()) |
| { |
| throw new IllegalStateException("Session is already closed"); |
| } |
| else if (hasFailedOverDirty()) |
| { |
| //perform an implicit recover in this scenario |
| recover(); |
| |
| //notify the consumer |
| throw new IllegalStateException("has failed over"); |
| } |
| |
| try |
| { |
| acknowledgeImpl(); |
| markClean(); |
| } |
| catch (TransportException e) |
| { |
| throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e); |
| } |
| } |
| |
| public void setLegacyFieldsForQueueType(AMQDestination dest) |
| { |
| // legacy support |
| dest.setQueueName(dest.getAddressName()); |
| dest.setExchangeName(""); |
| dest.setExchangeClass(""); |
| dest.setRoutingKey(dest.getAMQQueueName()); |
| } |
| |
| public void setLegacyFieldsForTopicType(AMQDestination dest) |
| { |
| // legacy support |
| dest.setExchangeName(dest.getAddressName()); |
| Node node = dest.getNode(); |
| dest.setExchangeClass(node.getExchangeType() == null |
| ? ExchangeDefaults.TOPIC_EXCHANGE_CLASS |
| : node.getExchangeType()); |
| dest.setRoutingKey(dest.getSubject()); |
| } |
| |
| protected void verifySubject(AMQDestination dest) throws QpidException |
| { |
| if (dest.getSubject() == null || dest.getSubject().trim().equals("")) |
| { |
| |
| if ("topic".equals(dest.getExchangeClass())) |
| { |
| dest.setRoutingKey("#"); |
| dest.setSubject(dest.getRoutingKey()); |
| } |
| else |
| { |
| dest.setRoutingKey(""); |
| dest.setSubject(""); |
| } |
| } |
| } |
| |
| public abstract boolean isExchangeExist(AMQDestination dest, boolean assertNode) throws QpidException; |
| |
| public abstract boolean isQueueExist(AMQDestination dest, boolean assertNode) throws QpidException; |
| |
| /** |
| * 1. Try to resolve the address type (queue or exchange) |
| * 2. if type == queue, |
| * 2.1 verify queue exists or create if create == true |
| * 2.2 If not throw exception |
| * |
| * 3. if type == exchange, |
| * 3.1 verify exchange exists or create if create == true |
| * 3.2 if not throw exception |
| * 3.3 if exchange exists (or created) create subscription queue. |
| */ |
| |
| @SuppressWarnings("deprecation") |
| public void resolveAddress(AMQDestination dest, |
| boolean isConsumer, |
| boolean noLocal) throws QpidException |
| { |
| if (isResolved(dest)) |
| { |
| return; |
| } |
| else |
| { |
| boolean assertNode = (dest.getAssert() == AMQDestination.AddressOption.ALWAYS) || |
| (isConsumer && dest.getAssert() == AMQDestination.AddressOption.RECEIVER) || |
| (!isConsumer && dest.getAssert() == AMQDestination.AddressOption.SENDER); |
| |
| boolean createNode = (dest.getCreate() == AMQDestination.AddressOption.ALWAYS) || |
| (isConsumer && dest.getCreate() == AMQDestination.AddressOption.RECEIVER) || |
| (!isConsumer && dest.getCreate() == AMQDestination.AddressOption.SENDER); |
| |
| |
| int suppliedType = dest.getNode() == null ? AMQDestination.UNKNOWN_TYPE : dest.getNode().getType(); |
| int type = resolveAddressType(dest); |
| switch (type) |
| { |
| case AMQDestination.QUEUE_TYPE: |
| |
| setLegacyFieldsForQueueType(dest); |
| if(createNode) |
| { |
| handleQueueNodeCreation(dest,noLocal); |
| break; |
| } |
| else if (isQueueExist(dest,assertNode) || suppliedType == AMQDestination.QUEUE_TYPE) |
| { |
| break; |
| } |
| case AMQDestination.TOPIC_TYPE: |
| if(suppliedType != AMQDestination.QUEUE_TYPE) |
| { |
| setLegacyFieldsForTopicType(dest); |
| if (createNode) |
| { |
| verifySubject(dest); |
| handleExchangeNodeCreation(dest); |
| break; |
| } |
| else if (isExchangeExist(dest, assertNode) || suppliedType == AMQDestination.TOPIC_TYPE) |
| { |
| verifySubject(dest); |
| break; |
| } |
| } |
| default: |
| throw new QpidException( |
| "The name '" + dest.getAddressName() + |
| "' supplied in the address doesn't resolve to an exchange or a queue"); |
| } |
| setResolved(dest); |
| } |
| } |
| |
| void setResolved(final AMQDestination dest) |
| { |
| _resolvedDestinations.put(dest, new WeakReference<>(dest)); |
| } |
| |
| void setUnresolved(final AMQDestination dest) |
| { |
| _resolvedDestinations.remove(dest); |
| } |
| |
| private void clearResolvedDestinations() |
| { |
| _resolvedDestinations.clear(); |
| } |
| |
| boolean isResolved(final AMQDestination dest) |
| { |
| final WeakReference<AMQDestination> resolvedDestRef = _resolvedDestinations.get(dest); |
| final AMQDestination resolvedDest = resolvedDestRef == null ? null : resolvedDestRef.get(); |
| if (resolvedDest == dest) |
| { |
| return true; |
| } |
| else if (resolvedDest == null) |
| { |
| return false; |
| } |
| |
| // verify legacy fields are equal |
| return Objects.equals(dest.getQueueName(), resolvedDest.getQueueName()) && |
| Objects.equals(dest.getExchangeName(), resolvedDest.getExchangeName()) && |
| Objects.equals(dest.getExchangeClass(), resolvedDest.getExchangeClass()) && |
| Objects.equals(dest.getRoutingKey(), resolvedDest.getRoutingKey()) && |
| Objects.equals(dest.getSubject(), resolvedDest.getSubject()); |
| } |
| |
| public abstract int resolveAddressType(AMQDestination dest) throws QpidException; |
| |
| protected abstract void acknowledgeImpl() throws JMSException; |
| |
| /** |
| * Acknowledge one or many messages. |
| * |
| * @param deliveryTag The tag of the last message to be acknowledged. |
| * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the |
| * delivery tag, <tt>false</tt> to just acknowledge that message. |
| * |
| * TODO Be aware of possible changes to parameter order as versions change. |
| */ |
| public abstract void acknowledgeMessage(long deliveryTag, boolean multiple); |
| |
| /** |
| * Binds the named queue, with the specified routing key, to the named exchange. |
| * <p> |
| * Note that this operation automatically retries in the event of fail-over. |
| * |
| * @param queueName The name of the queue to bind. |
| * @param routingKey The routing key to bind the queue with. |
| * @param arguments Additional arguments. |
| * @param exchangeName The exchange to bind the queue on. |
| * |
| * @throws QpidException If the queue cannot be bound for any reason. |
| * TODO Be aware of possible changes to parameter order as versions change. |
| * TODO Document the additional arguments that may be passed in the field table. Are these for headers exchanges? |
| */ |
| public void bindQueue(final String queueName, final String routingKey, final Map<String,Object> arguments, |
| final String exchangeName, final AMQDestination destination) throws QpidException |
| { |
| bindQueue(queueName, routingKey, arguments, exchangeName, destination, false); |
| } |
| |
| public void bindQueue(final String queueName, final String routingKey, final Map<String,Object> arguments, |
| final String exchangeName, final AMQDestination destination, |
| final boolean nowait) throws QpidException |
| { |
| /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ |
| new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() |
| { |
| public Object execute() throws QpidException, FailoverException |
| { |
| sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait); |
| return null; |
| } |
| }, _connection).execute(); |
| } |
| |
| public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws QpidException |
| { |
| if (consumer.getQueuename() != null) |
| { |
| bindQueue(consumer.getQueuename(), routingKey, new HashMap<String, Object>(), amqd.getExchangeName(), amqd); |
| } |
| } |
| |
| public abstract void sendQueueBind(final String queueName, final String routingKey, final Map<String,Object> arguments, |
| final String exchangeName, AMQDestination destination, |
| final boolean nowait) throws QpidException, FailoverException; |
| |
| /** |
| * Closes the session. |
| * <p> |
| * Note that this operation succeeds automatically if a fail-over interrupts the synchronous request to close |
| * the channel. This is because the channel is marked as closed before the request to close it is made, so the |
| * fail-over should not re-open it. |
| * |
| * @param timeout The timeout in milliseconds to wait for the session close acknowledgement from the broker. |
| * |
| * @throws JMSException If the JMS provider fails to close the session due to some internal error. |
| * TODO Be aware of possible changes to parameter order as versions change. |
| * TODO Not certain about the logic of ignoring the failover exception, because the channel won't be |
| * re-opened. May need to examine this more carefully. |
| * TODO Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, |
| * because the failover process sends the failover event before acquiring the mutex itself. |
| */ |
| public void close(long timeout) throws JMSException |
| { |
| setClosing(true); |
| lockMessageDelivery(); |
| try |
| { |
| // We must close down all producers and consumers in an orderly fashion. This is the only method |
| // that can be called from a different thread of control from the one controlling the session. |
| synchronized (getFailoverMutex()) |
| { |
| close(timeout, true); |
| } |
| } |
| finally |
| { |
| unlockMessageDelivery(); |
| } |
| } |
| |
| private void close(long timeout, boolean sendClose) throws JMSException |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Closing session: " + this); |
| } |
| |
| // Ensure we only try and close an open session. |
| if (!setClosed()) |
| { |
| setClosing(true); |
| // we pass null since this is not an error case |
| closeProducersAndConsumers(null); |
| |
| try |
| { |
| // If the connection is open or we are in the process |
| // of closing the connection then send a cance |
| // no point otherwise as the connection will be gone |
| if (!_connection.isClosed() || _connection.isClosing()) |
| { |
| if (sendClose) |
| { |
| sendClose(timeout); |
| } |
| } |
| } |
| catch (QpidException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing session: " + e), e); |
| } |
| // This is ignored because the channel is already marked as closed so the fail-over process will |
| // not re-open it. |
| catch (FailoverException e) |
| { |
| _logger.debug( |
| "Got FailoverException during channel close, ignored as channel already marked as closed."); |
| } |
| catch (TransportException e) |
| { |
| throw toJMSException("Error closing session:" + e.getMessage(), e); |
| } |
| finally |
| { |
| shutdownFlowControlNoAckTaskPool(); |
| _connection.deregisterSession(_channelId); |
| } |
| } |
| } |
| |
| public abstract void sendClose(long timeout) throws QpidException, FailoverException; |
| |
| /** |
| * Called when the server initiates the closure of the session unilaterally. |
| * |
| * @param e the exception that caused this session to be closed. Null causes the |
| */ |
| public void closed(Throwable e) throws JMSException |
| { |
| // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived |
| // calls through connection.closeAllSessions which is also called by the public connection.close() |
| // with a null cause |
| // When we are closing the Session due to a protocol session error we simply create a new AMQException |
| // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. |
| // We need to determine here if the connection should be |
| |
| if (e instanceof AMQDisconnectedException) |
| { |
| // Failover failed and ain't coming back. Knife the dispatcher. |
| stopDispatcherThread(); |
| |
| } |
| |
| //if we don't have an exception then we can perform closing operations |
| setClosing(e == null); |
| |
| if (!setClosed()) |
| { |
| // An AMQException has an error code and message already and will be passed in when closure occurs as a |
| // result of a channel close request |
| QpidException amqe; |
| if (e instanceof QpidException) |
| { |
| amqe = (QpidException) e; |
| } |
| else |
| { |
| amqe = new QpidException("Closing session forcibly", e); |
| } |
| |
| _connection.deregisterSession(_channelId); |
| closeProducersAndConsumers(amqe); |
| shutdownFlowControlNoAckTaskPool(); |
| } |
| |
| } |
| |
| protected void stopDispatcherThread() |
| { |
| if (_dispatcherThread != null) |
| { |
| _dispatcherThread.interrupt(); |
| } |
| } |
| /** |
| * Commits all messages done in this transaction and releases any locks currently held. |
| * <p> |
| * If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the |
| * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. |
| * The client will be unable to determine whether or not the commit actually happened on the broker in this case. |
| * |
| * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does |
| * not mean that the commit is known to have failed, merely that it is not known whether it |
| * failed or not. |
| */ |
| public void commit() throws JMSException |
| { |
| checkTransacted(); |
| |
| //Check that we are clean to commit. |
| if (_failedOverDirty) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back."); |
| } |
| rollback(); |
| |
| throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." + |
| "The session transaction was rolled back."); |
| } |
| |
| try |
| { |
| commitImpl(); |
| markClean(); |
| } |
| catch (QpidException e) |
| { |
| throw toJMSException("Exception during commit: " + e.getMessage() + (e.getCause() == null ? "" : " (Caused by : " + e.getCause() + ")"), e); |
| } |
| catch (FailoverException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException( |
| "Fail-over interrupted commit. Status of the commit is uncertain."), e); |
| } |
| catch(TransportException e) |
| { |
| throw toJMSException("Session exception occurred while trying to commit: " + e.getMessage(), e); |
| } |
| } |
| |
| protected abstract void commitImpl() throws QpidException, FailoverException, TransportException; |
| |
| |
| |
| |
| public void confirmConsumerCancelled(String consumerTag) |
| { |
| C consumer = _consumers.get(consumerTag); |
| if (consumer != null) |
| { |
| if (!consumer.isBrowseOnly()) // Normal Consumer |
| { |
| // Clean the Maps up first |
| // Flush any pending messages for this consumerTag |
| if (_dispatcher != null) |
| { |
| _logger.debug("Dispatcher is not null"); |
| } |
| else |
| { |
| _logger.debug("Dispatcher is null so created stopped dispatcher"); |
| startDispatcherIfNecessary(true); |
| } |
| |
| rejectPending(consumer); |
| } |
| else // Queue Browser |
| { |
| // Just close the consumer |
| // fixme the CancelOK is being processed before the arriving messages.. |
| // The dispatcher is still to process them so the server sent in order but the client |
| // has yet to receive before the close comes in |
| |
| if (consumer.isAutoClose()) |
| { |
| // There is a small window where the message is between the two queues in the dispatcher. |
| if (consumer.isClosed()) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Closing consumer:" + consumer.debugIdentity()); |
| } |
| |
| deregisterConsumer(consumer); |
| } |
| else |
| { |
| _queue.add(new CloseConsumerMessage(consumer)); |
| } |
| } |
| } |
| } |
| } |
| |
| private void rejectPending(C consumer) |
| { |
| // Reject messages on pre-receive queue |
| consumer.releasePendingMessages(); |
| |
| // Reject messages on pre-dispatch queue |
| rejectMessagesForConsumerTag(consumer.getConsumerTag()); |
| |
| // closeConsumer |
| consumer.markClosed(); |
| } |
| |
| public QueueBrowser createBrowser(Queue queue) throws JMSException |
| { |
| if (isStrictAMQP()) |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| return createBrowser(queue, null); |
| } |
| |
| /** |
| * Create a queue browser if the destination is a valid queue. |
| */ |
| public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException |
| { |
| if (isStrictAMQP()) |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| checkNotClosed(); |
| checkValidQueue(queue); |
| |
| return new AMQQueueBrowser(this, queue, messageSelector); |
| } |
| |
| protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal) |
| throws JMSException |
| { |
| checkValidDestination(destination); |
| |
| return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, false, |
| messageSelector, null, true, true); |
| } |
| |
| public MessageConsumer createConsumer(Destination destination) throws JMSException |
| { |
| checkValidDestination(destination); |
| |
| return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null, |
| isBrowseOnlyDestination(destination), false); |
| } |
| |
| public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException |
| { |
| checkValidDestination(destination); |
| |
| return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), |
| messageSelector, null, isBrowseOnlyDestination(destination), false); |
| } |
| |
| public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) |
| throws JMSException |
| { |
| checkValidDestination(destination); |
| |
| return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic), |
| messageSelector, null, isBrowseOnlyDestination(destination), false); |
| } |
| |
| public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, |
| String selector) throws JMSException |
| { |
| checkValidDestination(destination); |
| |
| return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false); |
| } |
| |
| public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, |
| boolean exclusive, String selector) throws JMSException |
| { |
| checkValidDestination(destination); |
| |
| return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false); |
| } |
| |
| public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, |
| boolean exclusive, String selector, Map<String,Object> rawSelector) throws JMSException |
| { |
| checkValidDestination(destination); |
| |
| return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination), |
| false); |
| } |
| |
| public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException |
| { |
| // Delegate the work to the {@link #createDurableSubscriber(Topic, String, String, boolean)} method |
| return createDurableSubscriber(topic, name, null, false); |
| } |
| |
| public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) |
| throws JMSException |
| { |
| checkNotClosed(); |
| Topic origTopic = checkValidTopic(topic, true); |
| |
| AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); |
| if (dest.getDestSyntax() == DestSyntax.ADDR && !isResolved(dest)) |
| { |
| try |
| { |
| resolveAddress(dest,false,noLocal); |
| if (dest.getAddressType() != AMQDestination.TOPIC_TYPE) |
| { |
| throw new JMSException("Durable subscribers can only be created for Topics"); |
| } |
| } |
| catch(QpidException e) |
| { |
| throw toJMSException("Error when verifying destination",e); |
| } |
| catch(TransportException e) |
| { |
| throw toJMSException("Error when verifying destination", e); |
| } |
| } |
| |
| String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; |
| |
| _subscriberDetails.lock(); |
| try |
| { |
| TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name); |
| |
| // Not subscribed to this name in the current session |
| if (subscriber == null) |
| { |
| // After the address is resolved routing key will not be null. |
| String topicName = dest.getRoutingKey(); |
| |
| if (_strictAMQP) |
| { |
| if (_strictAMQPFATAL) |
| { |
| throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); |
| } |
| else |
| { |
| _logger.warn("Unable to determine if subscription already exists for '" + topicName |
| + "' for creation durableSubscriber. Requesting queue deletion regardless."); |
| } |
| |
| deleteQueue(dest.getAMQQueueName()); |
| } |
| else |
| { |
| Map<String,Object> args = new HashMap<String,Object>(); |
| |
| // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a |
| // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise |
| // possible to determine when querying the broker whether there are no arguments or just a non-matching selector |
| // argument, as specifying null for the arguments when querying means they should not be checked at all |
| args.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); |
| if(noLocal) |
| { |
| args.put(AMQPFilterTypes.NO_LOCAL.getValue(), true); |
| } |
| |
| // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec |
| // says we must trash the subscription. |
| boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()); |
| boolean isQueueBoundForTopicAndSelector = |
| isQueueBound(dest.getExchangeName(), |
| dest.getAMQQueueName(), |
| topicName, args); |
| |
| if (isQueueBound && !isQueueBoundForTopicAndSelector) |
| { |
| deleteQueue(dest.getAMQQueueName()); |
| } |
| else if(isQueueBound) // todo - this is a hack for 0-8/9/9-1 which cannot check if arguments on a binding match |
| { |
| try |
| { |
| bindQueue(dest.getAMQQueueName(), |
| dest.getRoutingKey(), |
| args, |
| dest.getExchangeName(), |
| dest, |
| true); |
| } |
| catch(QpidException e) |
| { |
| throw toJMSException("Error when checking binding",e); |
| } |
| } |
| } |
| } |
| else |
| { |
| // Subscribed with the same topic and no current / previous or same selector |
| if (subscriber.getTopic().equals(topic) |
| && ((messageSelector == null && subscriber.getMessageSelector() == null) |
| || (messageSelector != null && messageSelector.equals(subscriber.getMessageSelector())))) |
| { |
| throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription name " + name |
| + (messageSelector != null ? " and selector " + messageSelector : "")); |
| } |
| else |
| { |
| unsubscribe(name, true); |
| } |
| |
| } |
| |
| _subscriberAccess.lock(); |
| try |
| { |
| C consumer = (C) createConsumer(dest, messageSelector, noLocal); |
| consumer.markAsDurableSubscriber(); |
| subscriber = new TopicSubscriberAdaptor<C>(dest, consumer); |
| |
| // Save subscription information |
| _subscriptions.put(name, subscriber); |
| _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); |
| } |
| finally |
| { |
| _subscriberAccess.unlock(); |
| } |
| |
| return subscriber; |
| } |
| catch (TransportException e) |
| { |
| throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e); |
| } |
| finally |
| { |
| _subscriberDetails.unlock(); |
| } |
| } |
| |
| public ListMessage createListMessage() throws JMSException |
| { |
| checkNotClosed(); |
| AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory()); |
| msg.setAMQSession(this); |
| return msg; |
| } |
| |
| public MapMessage createMapMessage() throws JMSException |
| { |
| checkNotClosed(); |
| if (_useAMQPEncodedMapMessage) |
| { |
| AMQPEncodedMapMessage msg = new AMQPEncodedMapMessage(getMessageDelegateFactory()); |
| msg.setAMQSession(this); |
| return msg; |
| } |
| else |
| { |
| JMSMapMessage msg = new JMSMapMessage(getMessageDelegateFactory()); |
| msg.setAMQSession(this); |
| return msg; |
| } |
| } |
| |
| public javax.jms.Message createMessage() throws JMSException |
| { |
| return createBytesMessage(); |
| } |
| |
| public ObjectMessage createObjectMessage() throws JMSException |
| { |
| checkNotClosed(); |
| JMSObjectMessage msg = new JMSObjectMessage(getAMQConnection(), getMessageDelegateFactory()); |
| msg.setAMQSession(this); |
| return msg; |
| } |
| |
| public ObjectMessage createObjectMessage(Serializable object) throws JMSException |
| { |
| ObjectMessage msg = createObjectMessage(); |
| msg.setObject(object); |
| |
| return msg; |
| } |
| |
| public P createProducer(Destination destination) throws JMSException |
| { |
| return createProducerImpl(destination, null, null); |
| } |
| |
| public P createProducer(Destination destination, boolean immediate) throws JMSException |
| { |
| return createProducerImpl(destination, null, immediate); |
| } |
| |
| public P createProducer(Destination destination, boolean mandatory, boolean immediate) |
| throws JMSException |
| { |
| return createProducerImpl(destination, mandatory, immediate); |
| } |
| |
| public TopicPublisher createPublisher(Topic topic) throws JMSException |
| { |
| checkNotClosed(); |
| |
| return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic); |
| } |
| |
| public Queue createQueue(String queueName) throws JMSException |
| { |
| checkNotClosed(); |
| try |
| { |
| if (queueName.indexOf('/') == -1 && queueName.indexOf(';') == -1) |
| { |
| DestSyntax syntax = AMQDestination.getDestType(queueName); |
| if (syntax == AMQDestination.DestSyntax.BURL) |
| { |
| // For testing we may want to use the prefix |
| return new AMQQueue(getDefaultQueueExchangeName(), |
| AMQDestination.stripSyntaxPrefix(queueName)); |
| } |
| else |
| { |
| AMQQueue queue = new AMQQueue(queueName); |
| return queue; |
| |
| } |
| } |
| else |
| { |
| return new AMQQueue(queueName); |
| } |
| } |
| catch (URISyntaxException urlse) |
| { |
| _logger.error("", urlse); |
| throw JMSExceptionHelper.chainJMSException(new JMSException(urlse.getReason()), urlse); |
| } |
| |
| } |
| |
| /** |
| * Declares the named queue. |
| * <p> |
| * Note that this operation automatically retries in the event of fail-over. |
| * |
| * @param name The name of the queue to declare. |
| * @param autoDelete |
| * @param durable Flag to indicate that the queue is durable. |
| * @param exclusive Flag to indicate that the queue is exclusive to this client. |
| * |
| * @throws QpidException If the queue cannot be declared for any reason. |
| * TODO Be aware of possible changes to parameter order as versions change. |
| */ |
| public void createQueue(final String name, final boolean autoDelete, final boolean durable, |
| final boolean exclusive) throws QpidException |
| { |
| createQueue(name, autoDelete, durable, exclusive, null); |
| } |
| |
| /** |
| * Declares the named queue. |
| * <p> |
| * Note that this operation automatically retries in the event of fail-over. |
| * |
| * @param name The name of the queue to declare. |
| * @param autoDelete |
| * @param durable Flag to indicate that the queue is durable. |
| * @param exclusive Flag to indicate that the queue is exclusive to this client. |
| * @param arguments Arguments used to set special properties of the queue |
| * |
| * @throws QpidException If the queue cannot be declared for any reason. |
| * TODO Be aware of possible changes to parameter order as versions change. |
| */ |
| public void createQueue(final String name, final boolean autoDelete, final boolean durable, |
| final boolean exclusive, final Map<String, Object> arguments) throws QpidException |
| { |
| new FailoverRetrySupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() |
| { |
| public Object execute() throws QpidException, FailoverException |
| { |
| sendCreateQueue(name, autoDelete, durable, exclusive, arguments); |
| return null; |
| } |
| }, _connection).execute(); |
| } |
| |
| public abstract void sendCreateQueue(String name, final boolean autoDelete, final boolean durable, |
| final boolean exclusive, final Map<String, Object> arguments) throws |
| QpidException, FailoverException; |
| |
| /** |
| * Creates a QueueReceiver |
| * |
| * @param destination |
| * |
| * @return QueueReceiver - a wrapper around our MessageConsumer |
| * |
| * @throws JMSException |
| */ |
| public QueueReceiver createQueueReceiver(Destination destination) throws JMSException |
| { |
| checkValidDestination(destination); |
| Queue dest = validateQueue(destination); |
| C consumer = (C) createConsumer(dest); |
| consumer.setAddressType(AMQDestination.QUEUE_TYPE); |
| return new QueueReceiverAdaptor(dest, consumer); |
| } |
| |
| /** |
| * Creates a QueueReceiver using a message selector |
| * |
| * @param destination |
| * @param messageSelector |
| * |
| * @return QueueReceiver - a wrapper around our MessageConsumer |
| * |
| * @throws JMSException |
| */ |
| public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException |
| { |
| checkValidDestination(destination); |
| Queue dest = validateQueue(destination); |
| C consumer = (C) createConsumer(dest, messageSelector); |
| consumer.setAddressType(AMQDestination.QUEUE_TYPE); |
| return new QueueReceiverAdaptor(dest, consumer); |
| } |
| |
| /** |
| * Creates a QueueReceiver wrapping a MessageConsumer |
| * |
| * @param queue |
| * |
| * @return QueueReceiver |
| * |
| * @throws JMSException |
| */ |
| public QueueReceiver createReceiver(Queue queue) throws JMSException |
| { |
| checkNotClosed(); |
| Queue dest = validateQueue(queue); |
| C consumer = (C) createConsumer(dest); |
| consumer.setAddressType(AMQDestination.QUEUE_TYPE); |
| return new QueueReceiverAdaptor(dest, consumer); |
| } |
| |
| /** |
| * Creates a QueueReceiver wrapping a MessageConsumer using a message selector |
| * |
| * @param queue |
| * @param messageSelector |
| * |
| * @return QueueReceiver |
| * |
| * @throws JMSException |
| */ |
| public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException |
| { |
| checkNotClosed(); |
| Queue dest = validateQueue(queue); |
| C consumer = (C) createConsumer(dest, messageSelector); |
| consumer.setAddressType(AMQDestination.QUEUE_TYPE); |
| return new QueueReceiverAdaptor(dest, consumer); |
| } |
| |
| private Queue validateQueue(Destination dest) throws InvalidDestinationException |
| { |
| if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue) |
| { |
| return (Queue)dest; |
| } |
| else |
| { |
| throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue"); |
| } |
| } |
| |
| public QueueSender createSender(Queue queue) throws JMSException |
| { |
| checkNotClosed(); |
| |
| return new QueueSenderAdapter(createProducer(queue), queue); |
| } |
| |
| public StreamMessage createStreamMessage() throws JMSException |
| { |
| checkNotClosed(); |
| if (_useAMQPEncodedStreamMessage) |
| { |
| AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory()); |
| msg.setAMQSession(this); |
| return msg; |
| } |
| else |
| { |
| JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory()); |
| msg.setAMQSession(this); |
| return msg; |
| } |
| } |
| |
| /** |
| * Creates a non-durable subscriber |
| * |
| * @param topic |
| * |
| * @return TopicSubscriber - a wrapper round our MessageConsumer |
| * |
| * @throws JMSException |
| */ |
| public TopicSubscriber createSubscriber(Topic topic) throws JMSException |
| { |
| checkNotClosed(); |
| checkValidTopic(topic); |
| |
| return new TopicSubscriberAdaptor<C>(topic, |
| createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false)); |
| } |
| |
| /** |
| * Creates a non-durable subscriber with a message selector |
| * |
| * @param topic |
| * @param messageSelector |
| * @param noLocal |
| * |
| * @return TopicSubscriber - a wrapper round our MessageConsumer |
| * |
| * @throws JMSException |
| */ |
| public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException |
| { |
| checkNotClosed(); |
| checkValidTopic(topic); |
| |
| return new TopicSubscriberAdaptor<C>(topic, |
| createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal, |
| true, messageSelector, null, false, false)); |
| } |
| |
| public TemporaryQueue createTemporaryQueue() throws JMSException |
| { |
| checkNotClosed(); |
| try |
| { |
| AMQTemporaryQueue result = new AMQTemporaryQueue(this); |
| |
| // this is done so that we can produce to a temporary queue before we create a consumer |
| result.setQueueName(result.getRoutingKey()); |
| Map<String, Object> args; |
| if(_connection.getDelegate().isQueueLifetimePolicySupported()) |
| { |
| args = new HashMap<>(); |
| args.put("qpid.lifetime_policy", "DELETE_ON_CONNECTION_CLOSE"); |
| args.put("qpid.exclusivity_policy", "CONNECTION"); |
| } |
| else |
| { |
| args = null; |
| } |
| createQueue(result.getAMQQueueName(), result.isAutoDelete(), |
| result.isDurable(), result.isExclusive(), |
| args); |
| bindQueue(result.getAMQQueueName(), result.getRoutingKey(), |
| new HashMap<String, Object>(), result.getExchangeName(), result); |
| return result; |
| } |
| catch (QpidException e) |
| { |
| throw toJMSException("Cannot create temporary queue",e); |
| } |
| catch(TransportException e) |
| { |
| throw toJMSException("Cannot create temporary queue: " + e.getMessage(), e); |
| } |
| catch(Exception e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException("Cannot create temporary queue: " |
| + e.getMessage()), e); |
| } |
| } |
| |
| public TemporaryTopic createTemporaryTopic() throws JMSException |
| { |
| checkNotClosed(); |
| |
| return new AMQTemporaryTopic(this); |
| } |
| |
| public TextMessage createTextMessage() throws JMSException |
| { |
| checkNotClosed(); |
| |
| JMSTextMessage msg = new JMSTextMessage(getMessageDelegateFactory()); |
| msg.setAMQSession(this); |
| return msg; |
| } |
| |
| protected Object getFailoverMutex() |
| { |
| return _connection.getFailoverMutex(); |
| } |
| |
| public TextMessage createTextMessage(String text) throws JMSException |
| { |
| |
| TextMessage msg = createTextMessage(); |
| msg.setText(text); |
| |
| return msg; |
| } |
| |
| public Topic createTopic(String topicName) throws JMSException |
| { |
| checkNotClosed(); |
| try |
| { |
| if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1) |
| { |
| DestSyntax syntax = AMQDestination.getDestType(topicName); |
| // for testing we may want to use the prefix to indicate our choice. |
| topicName = AMQDestination.stripSyntaxPrefix(topicName); |
| if (syntax == AMQDestination.DestSyntax.BURL) |
| { |
| return new AMQTopic(getDefaultTopicExchangeName(), topicName); |
| } |
| else |
| { |
| return new AMQTopic("ADDR:" + getDefaultTopicExchangeName() + "/" + topicName); |
| } |
| } |
| else |
| { |
| return new AMQTopic(topicName); |
| } |
| |
| } |
| catch (URISyntaxException urlse) |
| { |
| _logger.error("", urlse); |
| throw JMSExceptionHelper.chainJMSException(new JMSException(urlse.getReason()), urlse); |
| } |
| } |
| |
| public void declareExchange(String name, String type, boolean nowait) throws QpidException |
| { |
| declareExchange(name, type, nowait, false, false, false); |
| } |
| |
| @Override |
| public void deleteExchange(final String exchangeName) throws JMSException |
| { |
| try |
| { |
| new FailoverRetrySupport<>(new FailoverProtectedOperation<Object, QpidException>() |
| { |
| public Object execute() throws QpidException, FailoverException |
| { |
| sendExchangeDelete(exchangeName, false); |
| return null; |
| } |
| }, _connection).execute(); |
| } |
| catch (QpidException e) |
| { |
| throw toJMSException("The exchange deletion failed: " + e.getMessage(), e); |
| } |
| } |
| |
| abstract void sendExchangeDelete(final String name, final boolean nowait) throws QpidException, FailoverException; |
| |
| abstract public void sync() throws QpidException; |
| |
| public int getAcknowledgeMode() |
| { |
| return _acknowledgeMode; |
| } |
| |
| public AMQConnection getAMQConnection() |
| { |
| return _connection; |
| } |
| |
| public int getChannelId() |
| { |
| return _channelId; |
| } |
| |
| public int getDefaultPrefetch() |
| { |
| return _prefetchHighMark; |
| } |
| |
| public int getDefaultPrefetchHigh() |
| { |
| return _prefetchHighMark; |
| } |
| |
| public int getDefaultPrefetchLow() |
| { |
| return _prefetchLowMark; |
| } |
| |
| public int getPrefetch() |
| { |
| return _prefetchHighMark; |
| } |
| |
| public String getDefaultQueueExchangeName() |
| { |
| return _connection.getDefaultQueueExchangeName(); |
| } |
| |
| public String getDefaultTopicExchangeName() |
| { |
| return _connection.getDefaultTopicExchangeName(); |
| } |
| |
| public MessageListener getMessageListener() throws JMSException |
| { |
| return _messageListener; |
| } |
| |
| public String getTemporaryQueueExchangeName() |
| { |
| return _connection.getTemporaryQueueExchangeName(); |
| } |
| |
| public String getTemporaryTopicExchangeName() |
| { |
| return _connection.getTemporaryTopicExchangeName(); |
| } |
| |
| public int getTicket() |
| { |
| return _ticket; |
| } |
| |
| /** |
| * Indicates whether the session is in transacted mode. |
| * |
| * @return true if the session is in transacted mode |
| * @throws IllegalStateException - if session is closed. |
| */ |
| public boolean getTransacted() throws JMSException |
| { |
| // Sun TCK checks that javax.jms.IllegalStateException is thrown for closed session |
| // nowhere else this behavior is documented |
| checkNotClosed(); |
| return _transacted; |
| } |
| |
| /** |
| * Indicates whether the session is in transacted mode. |
| */ |
| public boolean isTransacted() |
| { |
| return _transacted; |
| } |
| |
| public boolean hasConsumer(Destination destination) |
| { |
| AtomicInteger counter = _destinationConsumerCount.get(destination); |
| |
| return (counter != null) && (counter.get() != 0); |
| } |
| |
| /** Indicates that warnings should be generated on violations of the strict AMQP. */ |
| public boolean isStrictAMQP() |
| { |
| return _strictAMQP; |
| } |
| |
| public boolean isSuspended() |
| { |
| return _suspended; |
| } |
| |
| protected void addUnacknowledgedMessage(long id) |
| { |
| _unacknowledgedMessageTags.add(id); |
| } |
| |
| protected void addDeliveredMessage(long id) |
| { |
| _deliveredMessageTags.add(id); |
| } |
| |
| /** |
| * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto |
| * the queue read by the dispatcher. |
| * |
| * @param message the message that has been received |
| */ |
| public void messageReceived(UnprocessedMessage message) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Message[" + message.toString() + "] received in session"); |
| } |
| _highestDeliveryTag.set(message.getDeliveryTag()); |
| _queue.add(message); |
| } |
| |
| public void declareAndBind(AMQDestination amqd) |
| throws |
| QpidException |
| { |
| declareAndBind(amqd, new HashMap<String,Object>()); |
| } |
| |
| |
| public void declareAndBind(AMQDestination amqd, Map<String,Object> arguments) |
| throws |
| QpidException |
| { |
| declareExchange(amqd, false); |
| String queueName = declareQueue(amqd, false); |
| bindQueue(queueName, amqd.getRoutingKey(), arguments, amqd.getExchangeName(), amqd); |
| } |
| |
| /** |
| * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message. |
| * <p> |
| * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges |
| * all messages that have been delivered to the client. |
| * <p> |
| * Restarting a session causes it to take the following actions: |
| * |
| * <ul> |
| * <li>Stop message delivery.</li> |
| * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered". |
| * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered. |
| * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> |
| * </ul> |
| * |
| * <p> |
| * If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and |
| * receiving acknowledgment that it has then a JMSException will be thrown. In this case it will not be possible |
| * for the client to determine whether the broker is going to recover the session or not. |
| * |
| * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error. |
| * Not that this does not necessarily mean that the recovery has failed, but simply that it is |
| * not possible to tell if it has or not. |
| * TODO Be aware of possible changes to parameter order as versions change. |
| * |
| * Strategy for handling recover. |
| * Flush any acks not yet sent. |
| * Stop the message flow. |
| * Clear the dispatch queue and the consumer queues. |
| * Release/Reject all messages received but not yet acknowledged. |
| * Start the message flow. |
| */ |
| public void recover() throws JMSException |
| { |
| // Ensure that the session is open. |
| checkNotClosed(); |
| |
| // Ensure that the session is not transacted. |
| checkNotTransacted(); |
| |
| |
| try |
| { |
| // flush any acks we are holding in the buffer. |
| flushAcknowledgments(); |
| |
| // this is only set true here, and only set false when the consumers preDeliver method is called |
| _sessionInRecovery = true; |
| |
| boolean isSuspended = isSuspended(); |
| |
| if (!isSuspended) |
| { |
| suspendChannel(true); |
| } |
| |
| // Set to true to short circuit delivery of anything currently |
| //in the pre-dispatch queue. |
| _usingDispatcherForCleanup = true; |
| |
| syncDispatchQueue(false); |
| |
| // Set to false before sending the recover as 0-8/9/9-1 will |
| //send messages back before the recover completes, and we |
| //probably shouldn't clean those! ;-) |
| _usingDispatcherForCleanup = false; |
| |
| if (_dispatcher != null) |
| { |
| _dispatcher.recover(); |
| } |
| |
| sendRecover(); |
| |
| markClean(); |
| |
| if (!isSuspended) |
| { |
| suspendChannel(false); |
| } |
| } |
| catch (QpidException e) |
| { |
| throw toJMSException("Recover failed: " + e.getMessage(), e); |
| } |
| catch (FailoverException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException( |
| "Recovery was interrupted by fail-over. Recovery status is not known."), e); |
| } |
| catch(TransportException e) |
| { |
| throw toJMSException("Recover failed: " + e.getMessage(), e); |
| } |
| } |
| |
| protected abstract void sendRecover() throws QpidException, FailoverException; |
| |
| protected abstract void flushAcknowledgments(); |
| |
| public void rejectMessage(UnprocessedMessage message, boolean requeue) |
| { |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Rejecting Unacked message:" + message.getDeliveryTag()); |
| } |
| |
| rejectMessage(message.getDeliveryTag(), requeue); |
| } |
| |
| public void rejectMessage(AbstractJMSMessage message, boolean requeue) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag()); |
| } |
| |
| rejectMessage(message.getDeliveryTag(), requeue); |
| |
| } |
| |
| public abstract void rejectMessage(long deliveryTag, boolean requeue); |
| |
| /** |
| * Commits all messages done in this transaction and releases any locks currently held. |
| * <p> |
| * If the rollback fails, because the rollback itself is interrupted by a fail-over between requesting that the |
| * rollback be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. |
| * The client will be unable to determine whether or not the rollback actually happened on the broker in this case. |
| * |
| * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does |
| * not mean that the rollback is known to have failed, merely that it is not known whether it |
| * failed or not. |
| * TODO Be aware of possible changes to parameter order as versions change. |
| */ |
| public void rollback() throws JMSException |
| { |
| synchronized (_suspensionLock) |
| { |
| checkTransacted(); |
| |
| try |
| { |
| boolean isSuspended = isSuspended(); |
| |
| if (!isSuspended) |
| { |
| suspendChannel(true); |
| } |
| |
| setRollbackMark(); |
| |
| syncDispatchQueue(false); |
| |
| _dispatcher.rollback(); |
| |
| releaseForRollback(); |
| |
| sendRollback(); |
| |
| markClean(); |
| |
| if (!isSuspended) |
| { |
| suspendChannel(false); |
| } |
| } |
| catch (QpidException e) |
| { |
| throw toJMSException("Failed to rollback: " + e, e); |
| } |
| catch (FailoverException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException( |
| "Fail-over interrupted rollback. Status of the rollback is uncertain."), e); |
| } |
| catch (TransportException e) |
| { |
| throw toJMSException("Failure to rollback:" + e.getMessage(), e); |
| } |
| } |
| } |
| |
| public abstract void releaseForRollback(); |
| |
| public abstract void sendRollback() throws QpidException, FailoverException; |
| |
| public void run() |
| { |
| throw new java.lang.UnsupportedOperationException(); |
| } |
| |
| public void setMessageListener(MessageListener listener) throws JMSException |
| { |
| } |
| |
| /** |
| * @see #unsubscribe(String, boolean) |
| */ |
| public void unsubscribe(String name) throws JMSException |
| { |
| try |
| { |
| unsubscribe(name, false); |
| } |
| catch (TransportException e) |
| { |
| throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * Unsubscribe from a subscription. |
| * |
| * @param name the name of the subscription to unsubscribe |
| * @param safe allows safe unsubscribe operation that will not throw an {@link InvalidDestinationException} if the |
| * queue is not bound, possibly due to the subscription being closed. |
| * @throws JMSException on |
| * @throws InvalidDestinationException |
| */ |
| private void unsubscribe(String name, boolean safe) throws JMSException |
| { |
| TopicSubscriberAdaptor<C> subscriber; |
| |
| _subscriberDetails.lock(); |
| try |
| { |
| checkNotClosed(); |
| subscriber = _subscriptions.get(name); |
| if (subscriber != null) |
| { |
| // Remove saved subscription information |
| _subscriptions.remove(name); |
| _reverseSubscriptionMap.remove(subscriber.getMessageConsumer()); |
| } |
| } |
| finally |
| { |
| _subscriberDetails.unlock(); |
| } |
| |
| if (subscriber != null) |
| { |
| subscriber.close(); |
| |
| // send a queue.delete for the subscription |
| deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); |
| } |
| else |
| { |
| if (_strictAMQP) |
| { |
| if (_strictAMQPFATAL) |
| { |
| throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); |
| } |
| else |
| { |
| _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." |
| + " Requesting queue deletion regardless."); |
| } |
| |
| deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); |
| } |
| else // Queue Browser |
| { |
| |
| if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection))) |
| { |
| deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); |
| } |
| else if (!safe) |
| { |
| throw new InvalidDestinationException("Unknown subscription name: " + name); |
| } |
| } |
| } |
| } |
| |
| protected C createConsumerImpl(final Destination destination, final int prefetchHigh, |
| final int prefetchLow, final boolean noLocal, |
| final boolean exclusive, String selector, final Map<String,Object> rawSelector, |
| final boolean noConsume, final boolean autoClose) throws JMSException |
| { |
| checkTemporaryDestination(destination); |
| |
| if(!noConsume && isBrowseOnlyDestination(destination)) |
| { |
| throw new InvalidDestinationException("The consumer being created is not 'noConsume'," + |
| "but a 'browseOnly' Destination has been supplied."); |
| } |
| |
| final String messageSelector; |
| |
| if (_strictAMQP && !((selector == null) || selector.equals(""))) |
| { |
| if (_strictAMQPFATAL) |
| { |
| throw new UnsupportedOperationException("Selectors not currently supported by AMQP."); |
| } |
| else |
| { |
| messageSelector = null; |
| } |
| } |
| else |
| { |
| messageSelector = selector; |
| } |
| |
| return new FailoverRetrySupport<C, JMSException>( |
| new FailoverProtectedOperation<C, JMSException>() |
| { |
| public C execute() throws JMSException, FailoverException |
| { |
| checkNotClosed(); |
| |
| AMQDestination amqd = (AMQDestination) destination; |
| |
| C consumer; |
| try |
| { |
| consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, |
| noLocal, exclusive, messageSelector, rawSelector, noConsume, autoClose); |
| } |
| catch(TransportException e) |
| { |
| throw toJMSException("Exception while creating consumer: " + e.getMessage(), e); |
| } |
| |
| if (_messageListener != null) |
| { |
| consumer.setMessageListener(_messageListener); |
| } |
| |
| try |
| { |
| registerConsumer(consumer, false); |
| } |
| catch (AMQInvalidArgumentException ise) |
| { |
| throw JMSExceptionHelper.chainJMSException(new InvalidSelectorException(ise.getMessage()), |
| ise); |
| } |
| catch (AMQInvalidRoutingKeyException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new InvalidDestinationException( |
| "Invalid routing key:" |
| + amqd.getRoutingKey() |
| ), e); |
| } |
| catch (QpidException e) |
| { |
| if (e instanceof AMQChannelClosedException) |
| { |
| close(-1, false); |
| } |
| |
| throw toJMSException("Error registering consumer: " + e,e); |
| } |
| catch (TransportException e) |
| { |
| throw toJMSException("Exception while registering consumer:" + e.getMessage(), e); |
| } |
| return consumer; |
| } |
| }, _connection).execute(); |
| } |
| |
| public abstract C createMessageConsumer(final AMQDestination destination, |
| final int prefetchHigh, |
| final int prefetchLow, |
| final boolean noLocal, |
| final boolean exclusive, String selector, |
| final Map<String,Object> arguments, |
| final boolean noConsume, |
| final boolean autoClose) throws JMSException; |
| |
| /** |
| * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer |
| * instance. |
| * |
| * @param consumer the consum |
| */ |
| void deregisterConsumer(C consumer) |
| { |
| if (_consumers.remove(consumer.getConsumerTag()) != null) |
| { |
| _subscriberAccess.lock(); |
| try |
| { |
| String subscriptionName = _reverseSubscriptionMap.remove(consumer); |
| if (subscriptionName != null) |
| { |
| _subscriptions.remove(subscriptionName); |
| } |
| } |
| finally |
| { |
| _subscriberAccess.unlock(); |
| } |
| |
| Destination dest = consumer.getDestination(); |
| synchronized (dest) |
| { |
| // Provide additional NPE check |
| // This would occur if the consumer was closed before it was |
| // fully opened. |
| if (_destinationConsumerCount.get(dest) != null) |
| { |
| if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) |
| { |
| _destinationConsumerCount.remove(dest); |
| } |
| } |
| } |
| } |
| } |
| |
| void deregisterProducer(long producerId) |
| { |
| _producers.remove(producerId); |
| } |
| |
| boolean isInRecovery() |
| { |
| return _sessionInRecovery; |
| } |
| |
| boolean isQueueBound(String exchangeName, String queueName) throws JMSException |
| { |
| return isQueueBound(exchangeName, queueName, null); |
| } |
| |
| /** |
| * Tests whether or not the specified queue is bound to the specified exchange under a particular routing key. |
| * <p> |
| * Note that this operation automatically retries in the event of fail-over. |
| * |
| * @param exchangeName The exchange name to test for binding against. |
| * @param queueName The queue name to check if bound. |
| * @param routingKey The routing key to check if the queue is bound under. |
| * |
| * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not. |
| * |
| * @throws JMSException If the query fails for any reason. |
| * TODO Be aware of possible changes to parameter order as versions change. |
| */ |
| public abstract boolean isQueueBound(final String exchangeName, final String queueName, final String routingKey) |
| throws JMSException; |
| |
| public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; |
| |
| public abstract boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String,Object> args) throws JMSException; |
| |
| /** |
| * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover |
| * when the client has vetoed resubscription. |
| */ |
| void markClosed() |
| { |
| setClosed(); |
| _connection.deregisterSession(_channelId); |
| markClosedProducersAndConsumers(); |
| |
| } |
| |
| |
| void syncDispatchQueue(final boolean holdDispatchLock) |
| { |
| if (Thread.currentThread() == _dispatcherThread || holdDispatchLock) |
| { |
| while (!super.isClosed() && !_queue.isEmpty()) |
| { |
| Dispatchable disp; |
| try |
| { |
| disp = _queue.take(); |
| } |
| catch (InterruptedException e) |
| { |
| throw new RuntimeException(e); |
| } |
| |
| // Check just in case _queue becomes empty, it shouldn't but |
| // better than an NPE. |
| if (disp == null) |
| { |
| _logger.debug("_queue became empty during sync."); |
| break; |
| } |
| |
| disp.dispatch(AMQSession.this); |
| } |
| } |
| else |
| { |
| startDispatcherIfNecessary(); |
| |
| final CountDownLatch signal = new CountDownLatch(1); |
| |
| _queue.add(new Dispatchable() |
| { |
| public void dispatch(AMQSession ssn) |
| { |
| signal.countDown(); |
| } |
| }); |
| |
| try |
| { |
| signal.await(); |
| } |
| catch (InterruptedException e) |
| { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| void drainDispatchQueue() |
| { |
| if (Thread.currentThread() == _dispatcherThread) |
| { |
| while (!super.isClosed() && !_queue.isEmpty()) |
| { |
| Dispatchable disp; |
| try |
| { |
| disp = _queue.take(); |
| } |
| catch (InterruptedException e) |
| { |
| throw new RuntimeException(e); |
| } |
| |
| // Check just in case _queue becomes empty, it shouldn't but |
| // better than an NPE. |
| if (disp == null) |
| { |
| _logger.debug("_queue became empty during sync."); |
| break; |
| } |
| |
| disp.dispatch(AMQSession.this); |
| } |
| } |
| else |
| { |
| startDispatcherIfNecessary(false); |
| |
| final CountDownLatch signal = new CountDownLatch(1); |
| |
| _queue.add(new Dispatchable() |
| { |
| public void dispatch(AMQSession ssn) |
| { |
| signal.countDown(); |
| } |
| }); |
| |
| try |
| { |
| signal.await(); |
| } |
| catch (InterruptedException e) |
| { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| /** |
| * Resubscribes all producers and consumers. This is called when performing failover. |
| * |
| * @throws QpidException |
| */ |
| void resubscribe() throws QpidException |
| { |
| if (_dirty) |
| { |
| _failedOverDirty = true; |
| } |
| |
| // Also reset the delivery tag tracker, to insure we dont |
| // return the first <total number of msgs received on session> |
| // messages sent by the brokers following the first rollback |
| // after failover |
| resetRollbackMarkers(); |
| |
| _unacknowledgedMessageTags.clear(); |
| _prefetchedMessageTags.clear(); |
| |
| clearResolvedDestinations(); |
| resubscribeProducers(); |
| resubscribeConsumers(); |
| } |
| |
| void resetRollbackMarkers() |
| { |
| _highestDeliveryTag.set(-1); |
| _rollbackMark.set(-1); |
| } |
| |
| void setHasMessageListeners() |
| { |
| _hasMessageListeners = true; |
| } |
| |
| void setInRecovery(boolean inRecovery) |
| { |
| _sessionInRecovery = inRecovery; |
| } |
| |
| boolean isStarted() |
| { |
| return _startedAtLeastOnce.get(); |
| } |
| |
| /** |
| * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. |
| * |
| * @throws QpidException If the session cannot be started for any reason. |
| * TODO This should be controlled by _stopped as it pairs with the stop method fixme or check the |
| * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages |
| * for each subsequent call to flow.. only need to do this if we have called stop. |
| */ |
| void start() throws QpidException |
| { |
| // Check if the session has previously been started and suspended, in which case it must be unsuspended. |
| if (_startedAtLeastOnce.getAndSet(true)) |
| { |
| suspendChannel(false); |
| } |
| |
| // If the event dispatcher is not running then start it too. |
| if (hasMessageListeners()) |
| { |
| startDispatcherIfNecessary(); |
| } |
| } |
| |
| void startDispatcherIfNecessary() |
| { |
| //If we are the dispatcher then we don't need to check we are started |
| if (Thread.currentThread() == _dispatcherThread) |
| { |
| return; |
| } |
| |
| // If IMMEDIATE_PREFETCH is not set then we need to start fetching |
| // This is final per session so will be multi-thread safe. |
| if (!_immediatePrefetch) |
| { |
| // We do this now if this is the first call on a started connection |
| if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false)) |
| { |
| try |
| { |
| suspendChannel(false); |
| } |
| catch (QpidException e) |
| { |
| _logger.info("Unsuspending channel threw an exception:", e); |
| } |
| } |
| } |
| |
| startDispatcherIfNecessary(false); |
| } |
| |
| synchronized void startDispatcherIfNecessary(boolean initiallyStopped) |
| { |
| if (_dispatcher == null) |
| { |
| _dispatcher = new Dispatcher(); |
| try |
| { |
| _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher); |
| |
| } |
| catch(Exception e) |
| { |
| throw new Error("Error creating Dispatcher thread",e); |
| } |
| |
| String dispatcherThreadName = "Dispatcher-" + _channelId + "-Conn-" + _connection.getConnectionNumber(); |
| |
| _dispatcherThread.setName(dispatcherThreadName); |
| _dispatcherThread.setDaemon(DAEMON_DISPATCHER_THREAD); |
| _dispatcher.setConnectionStopped(initiallyStopped); |
| _dispatcherThread.start(); |
| if (_dispatcherLogger.isDebugEnabled()) |
| { |
| _dispatcherLogger.debug(_dispatcherThread.getName() + " created"); |
| } |
| } |
| else |
| { |
| _dispatcher.setConnectionStopped(initiallyStopped); |
| } |
| } |
| |
| void stop() throws QpidException |
| { |
| // Stop the server delivering messages to this session. |
| suspendChannelIfNotClosing(); |
| stopExistingDispatcher(); |
| } |
| |
| private void checkNotTransacted() throws JMSException |
| { |
| if (getTransacted()) |
| { |
| throw new IllegalStateException("Session is transacted"); |
| } |
| } |
| |
| private void checkTemporaryDestination(Destination destination) throws JMSException |
| { |
| if ((destination instanceof TemporaryDestination)) |
| { |
| _logger.debug("destination is temporary"); |
| final TemporaryDestination tempDest = (TemporaryDestination) destination; |
| if (tempDest.getSession().getAMQConnection() != this.getAMQConnection()) |
| { |
| _logger.debug("destination is on different conection"); |
| throw new JMSException("Cannot consume from a temporary destination created on another connection"); |
| } |
| |
| if (tempDest.isDeleted()) |
| { |
| _logger.debug("destination is deleted"); |
| throw new JMSException("Cannot consume from a deleted destination"); |
| } |
| } |
| } |
| |
| protected void checkTransacted() throws JMSException |
| { |
| if (!getTransacted()) |
| { |
| throw new IllegalStateException("Session is not transacted"); |
| } |
| } |
| |
| private void checkValidDestination(Destination destination) throws InvalidDestinationException |
| { |
| if (destination == null) |
| { |
| throw new javax.jms.InvalidDestinationException("Invalid Queue"); |
| } |
| } |
| |
| private void checkValidQueue(Queue queue) throws InvalidDestinationException |
| { |
| if (queue == null) |
| { |
| throw new javax.jms.InvalidDestinationException("Invalid Queue"); |
| } |
| } |
| |
| /* |
| * I could have combined the last 3 methods, but this way it improves readability |
| */ |
| protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException |
| { |
| if (topic == null) |
| { |
| throw new javax.jms.InvalidDestinationException("Invalid Topic"); |
| } |
| |
| if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this)) |
| { |
| throw new javax.jms.InvalidDestinationException( |
| "Cannot create a subscription on a temporary topic created in another session"); |
| } |
| |
| if ((topic instanceof TemporaryDestination) && durable) |
| { |
| throw new javax.jms.InvalidDestinationException |
| ("Cannot create a durable subscription with a temporary topic: " + topic); |
| } |
| |
| if (!(topic instanceof AMQDestination)) |
| { |
| throw new javax.jms.InvalidDestinationException( |
| "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " |
| + topic.getClass().getName()); |
| } |
| |
| return topic; |
| } |
| |
| protected Topic checkValidTopic(Topic topic) throws JMSException |
| { |
| return checkValidTopic(topic, false); |
| } |
| |
| /** |
| * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. |
| * |
| * @param error not null if this is a result of an error occurring at the connection level |
| */ |
| private void closeConsumers(Throwable error) throws JMSException |
| { |
| // we need to clone the list of consumers since the close() method updates the _consumers collection |
| // which would result in a concurrent modification exception |
| final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values()); |
| |
| final Iterator<C> it = clonedConsumers.iterator(); |
| while (it.hasNext()) |
| { |
| final C con = it.next(); |
| if (error != null) |
| { |
| con.notifyError(error); |
| } |
| else |
| { |
| con.close(false); |
| } |
| } |
| // at this point the _consumers map will be empty |
| if (_dispatcher != null) |
| { |
| _dispatcher.close(); |
| _dispatcher = null; |
| } |
| } |
| |
| /** |
| * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is |
| * currently no way of propagating errors to message producers (this is a JMS limitation). |
| */ |
| private void closeProducers() throws JMSException |
| { |
| // we need to clone the list of producers since the close() method updates the _producers collection |
| // which would result in a concurrent modification exception |
| final ArrayList clonedProducers = new ArrayList(_producers.values()); |
| |
| final Iterator it = clonedProducers.iterator(); |
| while (it.hasNext()) |
| { |
| final P prod = (P) it.next(); |
| prod.close(); |
| } |
| // at this point the _producers map is empty |
| } |
| |
| /** |
| * Close all producers or consumers. This is called either in the error case or when closing the session normally. |
| * |
| * @param amqe the exception, may be null to indicate no error has occurred |
| */ |
| private void closeProducersAndConsumers(QpidException amqe) throws JMSException |
| { |
| JMSException jmse = null; |
| try |
| { |
| closeProducers(); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("Error closing session: " + e, e); |
| jmse = e; |
| } |
| |
| try |
| { |
| closeConsumers(amqe); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("Error closing session: " + e, e); |
| if (jmse == null) |
| { |
| jmse = e; |
| } |
| } |
| |
| if (jmse != null) |
| { |
| throw jmse; |
| } |
| } |
| |
| /** |
| * Register to consume from the queue. |
| * @param queueName |
| */ |
| private void consumeFromQueue(C consumer, String queueName, boolean nowait) throws QpidException, FailoverException |
| { |
| Link link = consumer.getDestination().getLink(); |
| String linkName; |
| if(link != null && link.getName() != null && consumer.getDestination().getAddressType() == AMQDestination.QUEUE_TYPE) |
| { |
| linkName = link.getName(); |
| } |
| else |
| { |
| linkName = String.valueOf(_nextTag++); |
| } |
| |
| consumer.setConsumerTag(linkName); |
| // we must register the consumer in the map before we actually start listening |
| _consumers.put(linkName, consumer); |
| |
| synchronized (consumer.getDestination()) |
| { |
| _destinationConsumerCount.putIfAbsent(consumer.getDestination(), new AtomicInteger()); |
| _destinationConsumerCount.get(consumer.getDestination()).incrementAndGet(); |
| } |
| |
| |
| try |
| { |
| sendConsume(consumer, queueName, nowait); |
| } |
| catch (QpidException e) |
| { |
| // clean-up the map in the event of an error |
| _consumers.remove(linkName); |
| throw e; |
| } |
| } |
| |
| void handleLinkCreation(AMQDestination dest) throws QpidException |
| { |
| createBindings(dest, dest.getLink().getBindings()); |
| } |
| |
| |
| void createBindings(AMQDestination dest, List<AMQDestination.Binding> bindings) throws QpidException |
| { |
| String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest |
| .getAddressName() : "amq.topic"; |
| |
| String defaultQueueName = null; |
| if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) |
| { |
| defaultQueueName = dest.getQueueName(); |
| } |
| else |
| { |
| defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); |
| } |
| |
| for (AMQDestination.Binding binding: bindings) |
| { |
| String queue = binding.getQueue() == null? |
| defaultQueueName: binding.getQueue(); |
| |
| String exchange = binding.getExchange() == null ? |
| defaultExchangeForBinding : |
| binding.getExchange(); |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Binding queue : " + queue + |
| " exchange: " + exchange + |
| " using binding key " + binding.getBindingKey() + |
| " with args " + Strings.printMap(binding.getArgs())); |
| } |
| doBind(dest, binding, queue, exchange); |
| } |
| } |
| |
| protected abstract void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws QpidException; |
| |
| abstract void handleExchangeNodeCreation(AMQDestination dest) throws QpidException; |
| |
| abstract protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, final String exchange) |
| throws QpidException; |
| |
| public abstract void sendConsume(C consumer, String queueName, |
| boolean nowait) throws QpidException, FailoverException; |
| |
| private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate) |
| throws JMSException |
| { |
| return new FailoverRetrySupport<P, JMSException>( |
| new FailoverProtectedOperation<P, JMSException>() |
| { |
| public P execute() throws JMSException, FailoverException |
| { |
| checkNotClosed(); |
| long producerId = getNextProducerId(); |
| |
| P producer; |
| try |
| { |
| producer = createMessageProducer(destination, mandatory, |
| immediate, producerId); |
| } |
| catch (TransportException e) |
| { |
| throw toJMSException("Exception while creating producer:" + e.getMessage(), e); |
| } |
| |
| registerProducer(producerId, producer); |
| |
| return producer; |
| } |
| }, _connection).execute(); |
| } |
| |
| public abstract P createMessageProducer(final Destination destination, final Boolean mandatory, |
| final Boolean immediate, final long producerId) throws JMSException; |
| |
| private void declareExchange(AMQDestination amqd, boolean nowait) throws QpidException |
| { |
| declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), nowait, amqd.isExchangeDurable(), |
| amqd.isExchangeAutoDelete(), amqd.isExchangeInternal()); |
| } |
| |
| /** |
| * Returns the number of messages currently queued for the given destination. |
| * <p> |
| * Note that this operation automatically retries in the event of fail-over. |
| * |
| * @param amqd The destination to be checked |
| * |
| * @return the number of queued messages. |
| * |
| * @throws QpidException If the queue cannot be declared for any reason. |
| */ |
| public long getQueueDepth(final AMQDestination amqd) |
| throws QpidException |
| { |
| return getQueueDepth(amqd, false); |
| } |
| |
| /** |
| * Returns the number of messages currently queued by the given |
| * destination. Syncs session before receiving the queue depth if sync is |
| * set to true. |
| * |
| * @param amqd AMQ destination to get the depth value |
| * @param sync flag to sync session before receiving the queue depth |
| * @return queue depth |
| * @throws QpidException |
| */ |
| public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws QpidException |
| { |
| return new FailoverNoopSupport<Long, QpidException>(new FailoverProtectedOperation<Long, QpidException>() |
| { |
| public Long execute() throws QpidException, FailoverException |
| { |
| try |
| { |
| return requestQueueDepth(amqd, sync); |
| } |
| catch (TransportException e) |
| { |
| throw new AMQException(getErrorCode(e), e.getMessage(), e); |
| } |
| } |
| }, _connection).execute(); |
| } |
| |
| protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws QpidException, FailoverException; |
| |
| /** |
| * Declares the named exchange and type of exchange. |
| * <p> |
| * Note that this operation automatically retries in the event of fail-over. |
| * |
| * @param name The name of the exchange to declare. |
| * @param type The type of the exchange to declare. |
| * @param nowait |
| * @param durable |
| * @param autoDelete |
| * @param internal |
| * @throws QpidException If the exchange cannot be declared for any reason. |
| * TODO Be aware of possible changes to parameter order as versions change. |
| */ |
| void declareExchange(final String name, final String type, |
| final boolean nowait, final boolean durable, |
| final boolean autoDelete, final boolean internal) throws QpidException |
| { |
| new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() |
| { |
| public Object execute() throws QpidException, FailoverException |
| { |
| sendExchangeDeclare(name, type, nowait, durable, autoDelete, internal); |
| return null; |
| } |
| }, _connection).execute(); |
| } |
| |
| void declareExchange(final String name, final String type, |
| final boolean nowait, final boolean durable, |
| final boolean autoDelete, final Map<String,Object> arguments, |
| final boolean passive) throws QpidException |
| { |
| new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() |
| { |
| public Object execute() throws QpidException, FailoverException |
| { |
| sendExchangeDeclare(name, type, nowait, durable, autoDelete, arguments, passive); |
| return null; |
| } |
| }, _connection).execute(); |
| } |
| |
| protected String preprocessAddressTopic(final C consumer, |
| String queueName) throws QpidException |
| { |
| if (DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) |
| { |
| if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) |
| { |
| String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); |
| |
| createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); |
| queueName = consumer.getDestination().getAMQQueueName(); |
| consumer.setQueuename(queueName); |
| } |
| handleLinkCreation(consumer.getDestination()); |
| } |
| return queueName; |
| } |
| |
| abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws |
| QpidException; |
| |
| public abstract void sendExchangeDeclare(final String name, final String type, final boolean nowait, |
| boolean durable, boolean autoDelete, boolean internal) throws |
| QpidException, FailoverException; |
| |
| |
| public abstract void sendExchangeDeclare(final String name, |
| final String type, |
| final boolean nowait, |
| boolean durable, |
| boolean autoDelete, |
| Map<String,Object> arguments, |
| final boolean passive) throws QpidException, FailoverException; |
| |
| /** |
| * Declares a queue for a JMS destination. |
| * <p> |
| * Note that for queues but not topics the name is generated in the client rather than the server. This allows |
| * the name to be reused on failover if required. In general, the destination indicates whether it wants a name |
| * generated or not. |
| * <p> |
| * Note that this operation automatically retries in the event of fail-over. |
| * |
| * |
| * @param amqd The destination to declare as a queue. |
| * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of |
| * the client. |
| * |
| * |
| * |
| * @throws QpidException If the queue cannot be declared for any reason. |
| * TODO Verify the destiation is valid or throw an exception. |
| * TODO Be aware of possible changes to parameter order as versions change. |
| */ |
| protected String declareQueue(final AMQDestination amqd, |
| final boolean noLocal) throws QpidException |
| { |
| return declareQueue(amqd, noLocal, false); |
| } |
| |
| protected String declareQueue(final AMQDestination amqd, |
| final boolean noLocal, final boolean nowait) |
| throws QpidException |
| { |
| return declareQueue(amqd, noLocal, nowait, false); |
| } |
| |
| protected abstract String declareQueue(final AMQDestination amqd, |
| final boolean noLocal, final boolean nowait, final boolean passive) throws |
| QpidException; |
| |
| /** |
| * Undeclares the specified queue. |
| * <p> |
| * Note that this operation automatically retries in the event of fail-over. |
| * |
| * @param queueName The name of the queue to delete. |
| * |
| * @throws JMSException If the queue could not be deleted for any reason. |
| */ |
| @Override |
| public void deleteQueue(final String queueName) throws JMSException |
| { |
| try |
| { |
| new FailoverRetrySupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() |
| { |
| public Object execute() throws QpidException, FailoverException |
| { |
| sendQueueDelete(queueName); |
| return null; |
| } |
| }, _connection).execute(); |
| } |
| catch (QpidException e) |
| { |
| throw toJMSException("The queue deletion failed: " + e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * Undeclares the specified temporary queue/topic. |
| * <p> |
| * Note that this operation automatically retries in the event of fail-over. |
| * |
| * @param amqQueue The name of the temporary destination to delete. |
| * |
| * @throws JMSException If the queue could not be deleted for any reason. |
| * TODO Be aware of possible changes to parameter order as versions change. |
| */ |
| protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException |
| { |
| deleteQueue(amqQueue.getAMQQueueName()); |
| } |
| |
| public abstract void sendQueueDelete(final String queueName) throws QpidException, FailoverException; |
| |
| private long getNextProducerId() |
| { |
| return ++_nextProducerId; |
| } |
| |
| protected boolean hasMessageListeners() |
| { |
| return _hasMessageListeners; |
| } |
| |
| private void markClosedConsumers() throws JMSException |
| { |
| if (_dispatcher != null) |
| { |
| _dispatcher.close(); |
| _dispatcher = null; |
| } |
| // we need to clone the list of consumers since the close() method updates the _consumers collection |
| // which would result in a concurrent modification exception |
| final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values()); |
| |
| final Iterator<C> it = clonedConsumers.iterator(); |
| while (it.hasNext()) |
| { |
| final C con = it.next(); |
| con.markClosed(); |
| } |
| // at this point the _consumers map will be empty |
| } |
| |
| private void markClosedProducersAndConsumers() |
| { |
| try |
| { |
| // no need for a markClosed* method in this case since there is no protocol traffic closing a producer |
| closeProducers(); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("Error closing session: " + e, e); |
| } |
| |
| try |
| { |
| markClosedConsumers(); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("Error closing session: " + e, e); |
| } |
| } |
| |
| /** |
| * Callers must hold the failover mutex before calling this method. |
| * |
| * @param consumer |
| * |
| * @throws QpidException |
| */ |
| private void registerConsumer(C consumer, boolean nowait) throws QpidException |
| { |
| AMQDestination amqd = consumer.getDestination(); |
| |
| if (amqd.getDestSyntax() == DestSyntax.ADDR) |
| { |
| resolveAddress(amqd,true,consumer.isNoLocal()); |
| } |
| else |
| { |
| if (_declareExchanges && !amqd.neverDeclare()) |
| { |
| declareExchange(amqd, nowait); |
| } |
| |
| if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare()) |
| { |
| declareQueue(amqd, consumer.isNoLocal(), nowait); |
| } |
| if (_bindQueues && !amqd.neverDeclare() && !amqd.isDefaultExchange()) |
| { |
| if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey())) |
| { |
| bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), |
| amqd instanceof AMQTopic ? consumer.getArguments() : null, amqd.getExchangeName(), amqd, nowait); |
| } |
| } |
| |
| } |
| |
| String queueName = amqd.getAMQQueueName(); |
| |
| // store the consumer queue name |
| consumer.setQueuename(queueName); |
| |
| // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch |
| if (!_immediatePrefetch) |
| { |
| // The dispatcher will be null if we have just created this session |
| // so suspend the channel before we register our consumer so that we don't |
| // start prefetching until a receive/mListener is set. |
| if (_dispatcher == null) |
| { |
| if (!isSuspended()) |
| { |
| try |
| { |
| suspendChannel(true); |
| _logger.debug( |
| "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); |
| } |
| catch (QpidException e) |
| { |
| _logger.info("Suspending channel threw an exception:", e); |
| } |
| } |
| } |
| } |
| else |
| { |
| _logger.debug("Immediately prefetching existing messages to new consumer."); |
| } |
| |
| try |
| { |
| consumeFromQueue(consumer, queueName, nowait); |
| } |
| catch (FailoverException e) |
| { |
| throw new QpidException("Fail-over exception interrupted basic consume.", e); |
| } |
| } |
| |
| protected abstract boolean isBound(String exchangeName, String amqQueueName, String routingKey) |
| throws QpidException; |
| |
| private void registerProducer(long producerId, MessageProducer producer) |
| { |
| _producers.put(producerId, producer); |
| } |
| |
| private void rejectMessagesForConsumerTag(String consumerTag) |
| { |
| Iterator<Dispatchable> messages = _queue.iterator(); |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ")"); |
| |
| if (messages.hasNext()) |
| { |
| _logger.debug("Checking all messages in _queue for Consumer tag(" + consumerTag + ")"); |
| } |
| else |
| { |
| _logger.debug("No messages in _queue to reject"); |
| } |
| } |
| while (messages.hasNext()) |
| { |
| UnprocessedMessage message = (UnprocessedMessage) messages.next(); |
| |
| if (message.getConsumerTag().equals(consumerTag)) |
| { |
| |
| if (_queue.remove(message)) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" |
| + message.getDeliveryTag()); |
| } |
| |
| rejectMessage(message, true); |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Rejected the message(" + message.toString() + ") for consumer :" + consumerTag); |
| } |
| } |
| } |
| } |
| } |
| |
| private void resubscribeConsumers() throws QpidException |
| { |
| ArrayList<C> consumers = new ArrayList<C>(_consumers.values()); |
| _consumers.clear(); |
| |
| for (C consumer : consumers) |
| { |
| consumer.failedOverPre(); |
| registerConsumer(consumer, true); |
| consumer.failedOverPost(); |
| } |
| } |
| |
| private void resubscribeProducers() throws QpidException |
| { |
| ArrayList producers = new ArrayList(_producers.values()); |
| _logger.debug(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", |
| producers, |
| producers.size())); // FIXME: removeKey |
| for (Iterator it = producers.iterator(); it.hasNext();) |
| { |
| P producer = (P) it.next(); |
| producer.resubscribe(); |
| } |
| } |
| |
| /** |
| * Suspends or unsuspends this session. |
| * |
| * @param suspend true indicates that the session should be suspended, false indicates that it |
| * should be unsuspended. |
| * |
| * @throws QpidException If the session cannot be suspended for any reason. |
| * TODO Be aware of possible changes to parameter order as versions change. |
| */ |
| protected void suspendChannel(boolean suspend) throws QpidException |
| { |
| synchronized (_suspensionLock) |
| { |
| try |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); |
| } |
| |
| _suspended = suspend; |
| sendSuspendChannel(suspend); |
| } |
| catch (FailoverException e) |
| { |
| throw new QpidException("Fail-over interrupted suspend/unsuspend channel.", e); |
| } |
| catch (TransportException e) |
| { |
| throw new AMQException(getErrorCode(e), e.getMessage(), e); |
| } |
| } |
| } |
| |
| public abstract void sendSuspendChannel(boolean suspend) throws QpidException, FailoverException; |
| |
| boolean tryLockMessageDelivery() |
| { |
| try |
| { |
| // Use timeout of zero to respect fairness. See ReentrantLock#tryLock JavaDocs for details. |
| return _messageDeliveryLock.tryLock(0, TimeUnit.SECONDS); |
| } |
| catch (InterruptedException e) |
| { |
| Thread.currentThread().interrupt(); |
| return false; |
| } |
| } |
| |
| void lockMessageDelivery() |
| { |
| _messageDeliveryLock.lock(); |
| } |
| |
| void unlockMessageDelivery() |
| { |
| _messageDeliveryLock.unlock(); |
| } |
| |
| /** |
| * Indicates whether this session consumers pre-fetche messages |
| * |
| * @return true if this session consumers pre-fetche messages false otherwise |
| */ |
| public boolean prefetch() |
| { |
| return _prefetchHighMark > 0; |
| } |
| |
| /** Signifies that the session has pending sends to commit. */ |
| public void markDirty() |
| { |
| _dirty = true; |
| } |
| |
| /** Signifies that the session has no pending sends to commit. */ |
| public void markClean() |
| { |
| _dirty = false; |
| _failedOverDirty = false; |
| } |
| |
| /** |
| * Check to see if failover has occured since the last call to markClean(commit or rollback). |
| * |
| * @return boolean true if failover has occured. |
| */ |
| public boolean hasFailedOverDirty() |
| { |
| return _failedOverDirty; |
| } |
| |
| public void setTicket(int ticket) |
| { |
| _ticket = ticket; |
| } |
| |
| /** |
| * Tests whether flow to this session is blocked. |
| * |
| * @return true if flow is blocked or false otherwise. |
| */ |
| public abstract boolean isFlowBlocked(); |
| |
| public abstract void setFlowControl(final boolean active); |
| |
| Object getDispatcherLock() |
| { |
| Dispatcher dispatcher = _dispatcher; |
| return dispatcher == null ? null : dispatcher._lock; |
| } |
| |
| public String createTemporaryQueueName() |
| { |
| String prefix = _connection.getTemporaryQueuePrefix(); |
| assert(prefix.isEmpty() || prefix.endsWith("/")); |
| return prefix + "TempQueue" + UUID.randomUUID(); |
| } |
| |
| public interface Dispatchable |
| { |
| void dispatch(AMQSession ssn); |
| } |
| |
| public void dispatch(UnprocessedMessage message) |
| { |
| if (_dispatcher == null) |
| { |
| throw new java.lang.IllegalStateException("dispatcher is not started"); |
| } |
| |
| _dispatcher.dispatchMessage(message); |
| } |
| |
| /** Used for debugging in the dispatcher. */ |
| private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); |
| |
| /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ |
| class Dispatcher implements Runnable |
| { |
| |
| /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ |
| private final AtomicBoolean _closed = new AtomicBoolean(false); |
| private final CountDownLatch _closeCompleted = new CountDownLatch(1); |
| |
| private final Object _lock = new Object(); |
| private final String dispatcherID = "" + System.identityHashCode(this); |
| |
| public Dispatcher() |
| { |
| } |
| |
| public void close() |
| { |
| _closed.set(true); |
| _queue.close(); |
| _dispatcherThread.interrupt(); |
| |
| // If we are not the dispatcherThread we need to await the exiting of the Dispatcher#run(). See QPID-6672. |
| if (Thread.currentThread() != _dispatcherThread) |
| { |
| try |
| { |
| if(!_closeCompleted.await(_dispatcherShutdownTimeoutMs, TimeUnit.MILLISECONDS)) |
| { |
| throw new RuntimeException("Dispatcher did not close down within the timeout of " + _dispatcherShutdownTimeoutMs + " ms."); |
| } |
| } |
| catch (InterruptedException e) |
| { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| private AtomicBoolean getClosed() |
| { |
| return _closed; |
| } |
| |
| |
| public void rollback() |
| { |
| |
| synchronized (_lock) |
| { |
| boolean isStopped = connectionStopped(); |
| |
| if (!isStopped) |
| { |
| setConnectionStopped(true); |
| } |
| |
| setRollbackMark(); |
| |
| _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); |
| |
| for (C consumer : _consumers.values()) |
| { |
| if (!consumer.isBrowseOnly()) |
| { |
| consumer.releasePendingMessages(); |
| } |
| else |
| { |
| // should perhaps clear the _SQ here. |
| consumer.clearReceiveQueue(); |
| } |
| |
| } |
| |
| setConnectionStopped(isStopped); |
| } |
| |
| } |
| |
| public void recover() |
| { |
| |
| synchronized (_lock) |
| { |
| boolean isStopped = connectionStopped(); |
| |
| if (!isStopped) |
| { |
| setConnectionStopped(true); |
| } |
| |
| _dispatcherLogger.debug("Session clearing the consumer queues"); |
| |
| for (C consumer : _consumers.values()) |
| { |
| List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); |
| _prefetchedMessageTags.addAll(tags); |
| } |
| |
| setConnectionStopped(isStopped); |
| } |
| |
| } |
| |
| |
| public void run() |
| { |
| try |
| { |
| if (_dispatcherLogger.isDebugEnabled()) |
| { |
| _dispatcherLogger.debug(_dispatcherThread.getName() + " started"); |
| } |
| |
| // Allow dispatcher to start stopped |
| synchronized (_lock) |
| { |
| while (!_closed.get() && connectionStopped()) |
| { |
| try |
| { |
| _lock.wait(); |
| } |
| catch (InterruptedException e) |
| { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| try |
| { |
| |
| while (((_queue.blockingPeek()) != null) && !_closed.get()) |
| { |
| synchronized (_lock) |
| { |
| if (!isClosed() && !isClosing() && !_closed.get()) |
| { |
| Dispatchable disp = _queue.nonBlockingTake(); |
| |
| if(disp != null) |
| { |
| disp.dispatch(AMQSession.this); |
| } |
| } |
| } |
| } |
| } |
| catch (InterruptedException e) |
| { |
| // ignored as run will exit immediately |
| } |
| } |
| finally |
| { |
| _closeCompleted.countDown(); |
| if (_dispatcherLogger.isDebugEnabled()) |
| { |
| _dispatcherLogger.debug(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this); |
| } |
| } |
| } |
| |
| // only call while holding lock |
| final boolean connectionStopped() |
| { |
| return _connectionStopped.get(); |
| } |
| |
| boolean setConnectionStopped(boolean connectionStopped) |
| { |
| boolean currently = _connectionStopped.get(); |
| if(connectionStopped != currently) |
| { |
| synchronized (_lock) |
| { |
| _connectionStopped.set(connectionStopped); |
| _lock.notifyAll(); |
| |
| if (_dispatcherLogger.isDebugEnabled()) |
| { |
| _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped |
| ? "Stopped" |
| : "Started") |
| + ": Currently " + (currently ? "Stopped" : "Started")); |
| } |
| } |
| } |
| return currently; |
| } |
| |
| private void dispatchMessage(UnprocessedMessage message) |
| { |
| long deliveryTag = message.getDeliveryTag(); |
| |
| synchronized (_lock) |
| { |
| |
| try |
| { |
| while (!getAMQConnection().isFailingOver() && connectionStopped()) |
| { |
| _lock.wait(); |
| } |
| } |
| catch (InterruptedException e) |
| { |
| Thread.currentThread().interrupt(); |
| } |
| |
| if (!(message instanceof CloseConsumerMessage) |
| && tagLE(deliveryTag, _rollbackMark.get())) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Rejecting message because delivery tag " + deliveryTag |
| + " <= rollback mark " + _rollbackMark.get()); |
| } |
| rejectMessage(message, true); |
| } |
| else if (_usingDispatcherForCleanup) |
| { |
| _prefetchedMessageTags.add(deliveryTag); |
| } |
| else |
| { |
| while (!isClosed() && !isClosing()) |
| { |
| if (tryLockMessageDelivery()) |
| { |
| try |
| { |
| notifyConsumer(message); |
| break; |
| } |
| finally |
| { |
| unlockMessageDelivery(); |
| } |
| } |
| } |
| } |
| } |
| |
| long current = _rollbackMark.get(); |
| if (updateRollbackMark(current, deliveryTag)) |
| { |
| _rollbackMark.compareAndSet(current, deliveryTag); |
| } |
| } |
| |
| private void notifyConsumer(UnprocessedMessage message) |
| { |
| final C consumer = _consumers.get(message.getConsumerTag()); |
| |
| if ((consumer == null) || consumer.isClosed() || consumer.isClosing()) |
| { |
| if (_dispatcherLogger.isInfoEnabled()) |
| { |
| if (consumer == null) |
| { |
| _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" |
| + System.identityHashCode(message) + ")" + "[" |
| + message.getDeliveryTag() + "] from queue " |
| + message.getConsumerTag() + " )without a handler - rejecting(requeue)..."); |
| } |
| else |
| { |
| if (consumer.isBrowseOnly()) |
| { |
| _dispatcherLogger.info("Received a message(" |
| + System.identityHashCode(message) + ")" + "[" |
| + message.getDeliveryTag() + "] from queue " + " consumer(" |
| + message.getConsumerTag() + ") is closed and a browser so dropping..."); |
| //DROP MESSAGE |
| return; |
| |
| } |
| else |
| { |
| _dispatcherLogger.info("Received a message(" |
| + System.identityHashCode(message) + ")" + "[" |
| + message.getDeliveryTag() + "] from queue " + " consumer(" |
| + message.getConsumerTag() + ") is closed rejecting(requeue)..."); |
| } |
| } |
| } |
| // Don't reject if we're already closing |
| if (!_closed.get()) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag() |
| + " for closing consumer " + String.valueOf(consumer == null? null: consumer.getConsumerTag())); |
| } |
| rejectMessage(message, true); |
| } |
| } |
| else |
| { |
| consumer.notifyMessage(message); |
| } |
| } |
| } |
| |
| protected abstract boolean tagLE(long tag1, long tag2); |
| |
| protected abstract boolean updateRollbackMark(long current, long deliveryTag); |
| |
| public abstract AMQMessageDelegateFactory getMessageDelegateFactory(); |
| |
| private class SuspenderRunner implements Runnable |
| { |
| private AtomicBoolean _suspend; |
| |
| public SuspenderRunner(AtomicBoolean suspend) |
| { |
| _suspend = suspend; |
| } |
| |
| public void run() |
| { |
| try |
| { |
| synchronized (_suspensionLock) |
| { |
| // If the session has closed by the time we get here |
| // then we should not attempt to write to the session/channel. |
| if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing())) |
| { |
| suspendChannel(_suspend.get()); |
| } |
| } |
| } |
| catch (QpidException e) |
| { |
| _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: ", e); |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Is the _queue empty?" + _queue.isEmpty()); |
| _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher.getClosed())); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Checks if the Session and its parent connection are closed |
| * |
| * @return <tt>true</tt> if this is closed, <tt>false</tt> otherwise. |
| */ |
| @Override |
| public boolean isClosed() |
| { |
| return super.isClosed() || _connection.isClosed(); |
| } |
| |
| public boolean isSessionClosed() |
| { |
| return super.isClosed(); |
| } |
| |
| /** |
| * Checks if the Session and its parent connection are capable of performing |
| * closing operations |
| * |
| * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise. |
| */ |
| @Override |
| public boolean isClosing() |
| { |
| return super.isClosing() || _connection.isClosing(); |
| } |
| |
| public boolean isDeclareExchanges() |
| { |
| return _declareExchanges; |
| } |
| |
| JMSException toJMSException(String message, TransportException e) |
| { |
| int code = getErrorCode(e); |
| return JMSExceptionHelper.chainJMSException(new JMSException(message, Integer.toString(code)), e); |
| } |
| |
| private int getErrorCode(TransportException e) |
| { |
| int code = ErrorCodes.INTERNAL_ERROR; |
| if (e instanceof SessionException) |
| { |
| SessionException se = (SessionException) e; |
| if(se.getException() != null && se.getException().getErrorCode() != null) |
| { |
| code = se.getException().getErrorCode().getValue(); |
| } |
| } |
| return code; |
| } |
| |
| JMSException toJMSException(String message, QpidException e) |
| { |
| JMSException ex; |
| int errorCode = 0; |
| |
| if (e instanceof AMQException) |
| { |
| errorCode = ((AMQException) e).getErrorCode(); |
| } |
| |
| if (errorCode == ErrorCodes.ACCESS_REFUSED) |
| { |
| ex = JMSExceptionHelper.chainJMSException(new JMSSecurityException(message, |
| Integer.toString(errorCode)), e); |
| } |
| else |
| { |
| ex = JMSExceptionHelper.chainJMSException(new JMSException(message, errorCode == 0 ? null : Integer.toString(errorCode)), e); |
| } |
| return ex; |
| } |
| |
| private boolean isBrowseOnlyDestination(Destination destination) |
| { |
| return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()); |
| } |
| |
| private void setRollbackMark() |
| { |
| // Let the dispatcher know that all the incomming messages |
| // should be rolled back(reject/release) |
| _rollbackMark.set(_highestDeliveryTag.get()); |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Rollback mark is set to " + _rollbackMark.get()); |
| } |
| } |
| |
| |
| public MessageEncryptionHelper getMessageEncryptionHelper() |
| { |
| return _messageEncryptionHelper; |
| } |
| |
| protected void drainDispatchQueueWithDispatcher() |
| { |
| if (!_queue.isEmpty()) |
| { |
| try |
| { |
| setUsingDispatcherForCleanup(true); |
| drainDispatchQueue(); |
| } |
| finally |
| { |
| setUsingDispatcherForCleanup(false); |
| } |
| } |
| } |
| |
| protected void stopExistingDispatcher() |
| { |
| Dispatcher dispatcher = _dispatcher; |
| if (dispatcher != null) |
| { |
| dispatcher.setConnectionStopped(true); |
| } |
| } |
| |
| protected void suspendChannelIfNotClosing() throws QpidException |
| { |
| if (!(isClosed() || isClosing())) |
| { |
| suspendChannel(true); |
| } |
| } |
| |
| private void shutdownFlowControlNoAckTaskPool() |
| { |
| if (_flowControlNoAckTaskPool != null) |
| { |
| _flowControlNoAckTaskPool.shutdown(); |
| } |
| } |
| |
| } |
| |