| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq; |
| |
| import java.io.File; |
| import java.io.InputStream; |
| import java.io.Serializable; |
| import java.net.URL; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import javax.jms.BytesMessage; |
| import javax.jms.Destination; |
| import javax.jms.IllegalStateException; |
| import javax.jms.InvalidDestinationException; |
| import javax.jms.InvalidSelectorException; |
| import javax.jms.JMSException; |
| import javax.jms.MapMessage; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageListener; |
| import javax.jms.MessageProducer; |
| import javax.jms.ObjectMessage; |
| import javax.jms.Queue; |
| import javax.jms.QueueBrowser; |
| import javax.jms.QueueReceiver; |
| import javax.jms.QueueSender; |
| import javax.jms.QueueSession; |
| import javax.jms.Session; |
| import javax.jms.StreamMessage; |
| import javax.jms.TemporaryQueue; |
| import javax.jms.TemporaryTopic; |
| import javax.jms.TextMessage; |
| import javax.jms.Topic; |
| import javax.jms.TopicPublisher; |
| import javax.jms.TopicSession; |
| import javax.jms.TopicSubscriber; |
| import javax.jms.TransactionRolledBackException; |
| |
| import org.apache.activemq.blob.BlobDownloader; |
| import org.apache.activemq.blob.BlobTransferPolicy; |
| import org.apache.activemq.blob.BlobUploader; |
| import org.apache.activemq.command.ActiveMQBlobMessage; |
| import org.apache.activemq.command.ActiveMQBytesMessage; |
| import org.apache.activemq.command.ActiveMQDestination; |
| import org.apache.activemq.command.ActiveMQMapMessage; |
| import org.apache.activemq.command.ActiveMQMessage; |
| import org.apache.activemq.command.ActiveMQObjectMessage; |
| import org.apache.activemq.command.ActiveMQQueue; |
| import org.apache.activemq.command.ActiveMQStreamMessage; |
| import org.apache.activemq.command.ActiveMQTempDestination; |
| import org.apache.activemq.command.ActiveMQTempQueue; |
| import org.apache.activemq.command.ActiveMQTempTopic; |
| import org.apache.activemq.command.ActiveMQTextMessage; |
| import org.apache.activemq.command.ActiveMQTopic; |
| import org.apache.activemq.command.Command; |
| import org.apache.activemq.command.CommandTypes; |
| import org.apache.activemq.command.ConsumerId; |
| import org.apache.activemq.command.MessageAck; |
| import org.apache.activemq.command.MessageDispatch; |
| import org.apache.activemq.command.MessageId; |
| import org.apache.activemq.command.ProducerId; |
| import org.apache.activemq.command.RemoveInfo; |
| import org.apache.activemq.command.Response; |
| import org.apache.activemq.command.SessionId; |
| import org.apache.activemq.command.SessionInfo; |
| import org.apache.activemq.command.TransactionId; |
| import org.apache.activemq.management.JMSSessionStatsImpl; |
| import org.apache.activemq.management.StatsCapable; |
| import org.apache.activemq.management.StatsImpl; |
| import org.apache.activemq.thread.Scheduler; |
| import org.apache.activemq.transaction.Synchronization; |
| import org.apache.activemq.usage.MemoryUsage; |
| import org.apache.activemq.util.Callback; |
| import org.apache.activemq.util.LongSequenceGenerator; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * <P> |
| * A <CODE>Session</CODE> object is a single-threaded context for producing |
| * and consuming messages. Although it may allocate provider resources outside |
| * the Java virtual machine (JVM), it is considered a lightweight JMS object. |
| * <P> |
| * A session serves several purposes: |
| * <UL> |
| * <LI>It is a factory for its message producers and consumers. |
| * <LI>It supplies provider-optimized message factories. |
| * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and |
| * <CODE>TemporaryQueues</CODE>. |
| * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> |
| * objects for those clients that need to dynamically manipulate |
| * provider-specific destination names. |
| * <LI>It supports a single series of transactions that combine work spanning |
| * its producers and consumers into atomic units. |
| * <LI>It defines a serial order for the messages it consumes and the messages |
| * it produces. |
| * <LI>It retains messages it consumes until they have been acknowledged. |
| * <LI>It serializes execution of message listeners registered with its message |
| * consumers. |
| * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. |
| * </UL> |
| * <P> |
| * A session can create and service multiple message producers and consumers. |
| * <P> |
| * One typical use is to have a thread block on a synchronous |
| * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then |
| * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. |
| * <P> |
| * If a client desires to have one thread produce messages while others consume |
| * them, the client should use a separate session for its producing thread. |
| * <P> |
| * Once a connection has been started, any session with one or more registered |
| * message listeners is dedicated to the thread of control that delivers |
| * messages to it. It is erroneous for client code to use this session or any of |
| * its constituent objects from another thread of control. The only exception to |
| * this rule is the use of the session or connection <CODE>close</CODE> |
| * method. |
| * <P> |
| * It should be easy for most clients to partition their work naturally into |
| * sessions. This model allows clients to start simply and incrementally add |
| * message processing complexity as their need for concurrency grows. |
| * <P> |
| * The <CODE>close</CODE> method is the only session method that can be called |
| * while some other session method is being executed in another thread. |
| * <P> |
| * A session may be specified as transacted. Each transacted session supports a |
| * single series of transactions. Each transaction groups a set of message sends |
| * and a set of message receives into an atomic unit of work. In effect, |
| * transactions organize a session's input message stream and output message |
| * stream into series of atomic units. When a transaction commits, its atomic |
| * unit of input is acknowledged and its associated atomic unit of output is |
| * sent. If a transaction rollback is done, the transaction's sent messages are |
| * destroyed and the session's input is automatically recovered. |
| * <P> |
| * The content of a transaction's input and output units is simply those |
| * messages that have been produced and consumed within the session's current |
| * transaction. |
| * <P> |
| * A transaction is completed using either its session's <CODE>commit</CODE> |
| * method or its session's <CODE>rollback </CODE> method. The completion of a |
| * session's current transaction automatically begins the next. The result is |
| * that a transacted session always has a current transaction within which its |
| * work is done. |
| * <P> |
| * The Java Transaction Service (JTS) or some other transaction monitor may be |
| * used to combine a session's transaction with transactions on other resources |
| * (databases, other JMS sessions, etc.). Since Java distributed transactions |
| * are controlled via the Java Transaction API (JTA), use of the session's |
| * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is |
| * prohibited. |
| * <P> |
| * The JMS API does not require support for JTA; however, it does define how a |
| * provider supplies this support. |
| * <P> |
| * Although it is also possible for a JMS client to handle distributed |
| * transactions directly, it is unlikely that many JMS clients will do this. |
| * Support for JTA in the JMS API is targeted at systems vendors who will be |
| * integrating the JMS API into their application server products. |
| * |
| * |
| * @see javax.jms.Session |
| * @see javax.jms.QueueSession |
| * @see javax.jms.TopicSession |
| * @see javax.jms.XASession |
| */ |
| public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { |
| |
| /** |
| * Only acknowledge an individual message - using message.acknowledge() |
| * as opposed to CLIENT_ACKNOWLEDGE which |
| * acknowledges all messages consumed by a session at when acknowledge() |
| * is called |
| */ |
| public static final int INDIVIDUAL_ACKNOWLEDGE = 4; |
| public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; |
| |
| public static interface DeliveryListener { |
| void beforeDelivery(ActiveMQSession session, Message msg); |
| |
| void afterDelivery(ActiveMQSession session, Message msg); |
| } |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); |
| private final ThreadPoolExecutor connectionExecutor; |
| |
| protected int acknowledgementMode; |
| protected final ActiveMQConnection connection; |
| protected final SessionInfo info; |
| protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); |
| protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); |
| protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); |
| protected final ActiveMQSessionExecutor executor; |
| protected final AtomicBoolean started = new AtomicBoolean(false); |
| |
| protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); |
| protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); |
| |
| protected boolean closed; |
| private volatile boolean synchronizationRegistered; |
| protected boolean asyncDispatch; |
| protected boolean sessionAsyncDispatch; |
| protected final boolean debug; |
| protected final Object sendMutex = new Object(); |
| protected final Object redeliveryGuard = new Object(); |
| |
| private final AtomicBoolean clearInProgress = new AtomicBoolean(); |
| |
| private MessageListener messageListener; |
| private final JMSSessionStatsImpl stats; |
| private TransactionContext transactionContext; |
| private DeliveryListener deliveryListener; |
| private MessageTransformer transformer; |
| private BlobTransferPolicy blobTransferPolicy; |
| private long lastDeliveredSequenceId = -2; |
| |
| /** |
| * Construct the Session |
| * |
| * @param connection |
| * @param sessionId |
| * @param acknowledgeMode n.b if transacted - the acknowledgeMode == |
| * Session.SESSION_TRANSACTED |
| * @param asyncDispatch |
| * @param sessionAsyncDispatch |
| * @throws JMSException on internal error |
| */ |
| protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { |
| this.debug = LOG.isDebugEnabled(); |
| this.connection = connection; |
| this.acknowledgementMode = acknowledgeMode; |
| this.asyncDispatch = asyncDispatch; |
| this.sessionAsyncDispatch = sessionAsyncDispatch; |
| this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); |
| setTransactionContext(new TransactionContext(connection)); |
| stats = new JMSSessionStatsImpl(producers, consumers); |
| this.connection.asyncSendPacket(info); |
| setTransformer(connection.getTransformer()); |
| setBlobTransferPolicy(connection.getBlobTransferPolicy()); |
| this.connectionExecutor=connection.getExecutor(); |
| this.executor = new ActiveMQSessionExecutor(this); |
| connection.addSession(this); |
| if (connection.isStarted()) { |
| start(); |
| } |
| |
| } |
| |
| protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { |
| this(connection, sessionId, acknowledgeMode, asyncDispatch, true); |
| } |
| |
| /** |
| * Sets the transaction context of the session. |
| * |
| * @param transactionContext - provides the means to control a JMS |
| * transaction. |
| */ |
| public void setTransactionContext(TransactionContext transactionContext) { |
| this.transactionContext = transactionContext; |
| } |
| |
| /** |
| * Returns the transaction context of the session. |
| * |
| * @return transactionContext - session's transaction context. |
| */ |
| public TransactionContext getTransactionContext() { |
| return transactionContext; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.activemq.management.StatsCapable#getStats() |
| */ |
| @Override |
| public StatsImpl getStats() { |
| return stats; |
| } |
| |
| /** |
| * Returns the session's statistics. |
| * |
| * @return stats - session's statistics. |
| */ |
| public JMSSessionStatsImpl getSessionStats() { |
| return stats; |
| } |
| |
| /** |
| * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> |
| * object is used to send a message containing a stream of uninterpreted |
| * bytes. |
| * |
| * @return the an ActiveMQBytesMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| @Override |
| public BytesMessage createBytesMessage() throws JMSException { |
| ActiveMQBytesMessage message = new ActiveMQBytesMessage(); |
| configureMessage(message); |
| return message; |
| } |
| |
| /** |
| * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> |
| * object is used to send a self-defining set of name-value pairs, where |
| * names are <CODE>String</CODE> objects and values are primitive values |
| * in the Java programming language. |
| * |
| * @return an ActiveMQMapMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| @Override |
| public MapMessage createMapMessage() throws JMSException { |
| ActiveMQMapMessage message = new ActiveMQMapMessage(); |
| configureMessage(message); |
| return message; |
| } |
| |
| /** |
| * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> |
| * interface is the root interface of all JMS messages. A |
| * <CODE>Message</CODE> object holds all the standard message header |
| * information. It can be sent when a message containing only header |
| * information is sufficient. |
| * |
| * @return an ActiveMQMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| @Override |
| public Message createMessage() throws JMSException { |
| ActiveMQMessage message = new ActiveMQMessage(); |
| configureMessage(message); |
| return message; |
| } |
| |
| /** |
| * Creates an <CODE>ObjectMessage</CODE> object. An |
| * <CODE>ObjectMessage</CODE> object is used to send a message that |
| * contains a serializable Java object. |
| * |
| * @return an ActiveMQObjectMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| @Override |
| public ObjectMessage createObjectMessage() throws JMSException { |
| ActiveMQObjectMessage message = new ActiveMQObjectMessage(); |
| configureMessage(message); |
| return message; |
| } |
| |
| /** |
| * Creates an initialized <CODE>ObjectMessage</CODE> object. An |
| * <CODE>ObjectMessage</CODE> object is used to send a message that |
| * contains a serializable Java object. |
| * |
| * @param object the object to use to initialize this message |
| * @return an ActiveMQObjectMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| @Override |
| public ObjectMessage createObjectMessage(Serializable object) throws JMSException { |
| ActiveMQObjectMessage message = new ActiveMQObjectMessage(); |
| configureMessage(message); |
| message.setObject(object); |
| return message; |
| } |
| |
| /** |
| * Creates a <CODE>StreamMessage</CODE> object. A |
| * <CODE>StreamMessage</CODE> object is used to send a self-defining |
| * stream of primitive values in the Java programming language. |
| * |
| * @return an ActiveMQStreamMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| @Override |
| public StreamMessage createStreamMessage() throws JMSException { |
| ActiveMQStreamMessage message = new ActiveMQStreamMessage(); |
| configureMessage(message); |
| return message; |
| } |
| |
| /** |
| * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> |
| * object is used to send a message containing a <CODE>String</CODE> |
| * object. |
| * |
| * @return an ActiveMQTextMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| @Override |
| public TextMessage createTextMessage() throws JMSException { |
| ActiveMQTextMessage message = new ActiveMQTextMessage(); |
| configureMessage(message); |
| return message; |
| } |
| |
| /** |
| * Creates an initialized <CODE>TextMessage</CODE> object. A |
| * <CODE>TextMessage</CODE> object is used to send a message containing a |
| * <CODE>String</CODE>. |
| * |
| * @param text the string used to initialize this message |
| * @return an ActiveMQTextMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| @Override |
| public TextMessage createTextMessage(String text) throws JMSException { |
| ActiveMQTextMessage message = new ActiveMQTextMessage(); |
| message.setText(text); |
| configureMessage(message); |
| return message; |
| } |
| |
| /** |
| * Creates an initialized <CODE>BlobMessage</CODE> object. A |
| * <CODE>BlobMessage</CODE> object is used to send a message containing a |
| * <CODE>URL</CODE> which points to some network addressible BLOB. |
| * |
| * @param url the network addressable URL used to pass directly to the |
| * consumer |
| * @return a BlobMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| public BlobMessage createBlobMessage(URL url) throws JMSException { |
| return createBlobMessage(url, false); |
| } |
| |
| /** |
| * Creates an initialized <CODE>BlobMessage</CODE> object. A |
| * <CODE>BlobMessage</CODE> object is used to send a message containing a |
| * <CODE>URL</CODE> which points to some network addressible BLOB. |
| * |
| * @param url the network addressable URL used to pass directly to the |
| * consumer |
| * @param deletedByBroker indicates whether or not the resource is deleted |
| * by the broker when the message is acknowledged |
| * @return a BlobMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { |
| ActiveMQBlobMessage message = new ActiveMQBlobMessage(); |
| configureMessage(message); |
| message.setURL(url); |
| message.setDeletedByBroker(deletedByBroker); |
| message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); |
| return message; |
| } |
| |
| /** |
| * Creates an initialized <CODE>BlobMessage</CODE> object. A |
| * <CODE>BlobMessage</CODE> object is used to send a message containing |
| * the <CODE>File</CODE> content. Before the message is sent the file |
| * conent will be uploaded to the broker or some other remote repository |
| * depending on the {@link #getBlobTransferPolicy()}. |
| * |
| * @param file the file to be uploaded to some remote repo (or the broker) |
| * depending on the strategy |
| * @return a BlobMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| public BlobMessage createBlobMessage(File file) throws JMSException { |
| ActiveMQBlobMessage message = new ActiveMQBlobMessage(); |
| configureMessage(message); |
| message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); |
| message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); |
| message.setDeletedByBroker(true); |
| message.setName(file.getName()); |
| return message; |
| } |
| |
| /** |
| * Creates an initialized <CODE>BlobMessage</CODE> object. A |
| * <CODE>BlobMessage</CODE> object is used to send a message containing |
| * the <CODE>File</CODE> content. Before the message is sent the file |
| * conent will be uploaded to the broker or some other remote repository |
| * depending on the {@link #getBlobTransferPolicy()}. <br/> |
| * <p> |
| * The caller of this method is responsible for closing the |
| * input stream that is used, however the stream can not be closed |
| * until <b>after</b> the message has been sent. To have this class |
| * manage the stream and close it automatically, use the method |
| * {@link ActiveMQSession#createBlobMessage(File)} |
| * |
| * @param in the stream to be uploaded to some remote repo (or the broker) |
| * depending on the strategy |
| * @return a BlobMessage |
| * @throws JMSException if the JMS provider fails to create this message due |
| * to some internal error. |
| */ |
| public BlobMessage createBlobMessage(InputStream in) throws JMSException { |
| ActiveMQBlobMessage message = new ActiveMQBlobMessage(); |
| configureMessage(message); |
| message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); |
| message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); |
| message.setDeletedByBroker(true); |
| return message; |
| } |
| |
| /** |
| * Indicates whether the session is in transacted mode. |
| * |
| * @return true if the session is in transacted mode |
| * @throws JMSException if there is some internal error. |
| */ |
| @Override |
| public boolean getTransacted() throws JMSException { |
| checkClosed(); |
| return isTransacted(); |
| } |
| |
| /** |
| * Returns the acknowledgement mode of the session. The acknowledgement mode |
| * is set at the time that the session is created. If the session is |
| * transacted, the acknowledgement mode is ignored. |
| * |
| * @return If the session is not transacted, returns the current |
| * acknowledgement mode for the session. If the session is |
| * transacted, returns SESSION_TRANSACTED. |
| * @throws JMSException |
| * @see javax.jms.Connection#createSession(boolean,int) |
| * @since 1.1 exception JMSException if there is some internal error. |
| */ |
| @Override |
| public int getAcknowledgeMode() throws JMSException { |
| checkClosed(); |
| return this.acknowledgementMode; |
| } |
| |
| /** |
| * Commits all messages done in this transaction and releases any locks |
| * currently held. |
| * |
| * @throws JMSException if the JMS provider fails to commit the transaction |
| * due to some internal error. |
| * @throws TransactionRolledBackException if the transaction is rolled back |
| * due to some internal error during commit. |
| * @throws javax.jms.IllegalStateException if the method is not called by a |
| * transacted session. |
| */ |
| @Override |
| public void commit() throws JMSException { |
| checkClosed(); |
| if (!getTransacted()) { |
| throw new javax.jms.IllegalStateException("Not a transacted session"); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); |
| } |
| transactionContext.commit(); |
| } |
| |
| /** |
| * Rolls back any messages done in this transaction and releases any locks |
| * currently held. |
| * |
| * @throws JMSException if the JMS provider fails to roll back the |
| * transaction due to some internal error. |
| * @throws javax.jms.IllegalStateException if the method is not called by a |
| * transacted session. |
| */ |
| @Override |
| public void rollback() throws JMSException { |
| checkClosed(); |
| if (!getTransacted()) { |
| throw new javax.jms.IllegalStateException("Not a transacted session"); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); |
| } |
| transactionContext.rollback(); |
| } |
| |
| /** |
| * Closes the session. |
| * <P> |
| * Since a provider may allocate some resources on behalf of a session |
| * outside the JVM, clients should close the resources when they are not |
| * needed. Relying on garbage collection to eventually reclaim these |
| * resources may not be timely enough. |
| * <P> |
| * There is no need to close the producers and consumers of a closed |
| * session. |
| * <P> |
| * This call will block until a <CODE>receive</CODE> call or message |
| * listener in progress has completed. A blocked message consumer |
| * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session |
| * is closed. |
| * <P> |
| * Closing a transacted session must roll back the transaction in progress. |
| * <P> |
| * This method is the only <CODE>Session</CODE> method that can be called |
| * concurrently. |
| * <P> |
| * Invoking any other <CODE>Session</CODE> method on a closed session must |
| * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a |
| * closed session must <I>not </I> throw an exception. |
| * |
| * @throws JMSException if the JMS provider fails to close the session due |
| * to some internal error. |
| */ |
| @Override |
| public void close() throws JMSException { |
| if (!closed) { |
| if (getTransactionContext().isInXATransaction()) { |
| if (!synchronizationRegistered) { |
| synchronizationRegistered = true; |
| getTransactionContext().addSynchronization(new Synchronization() { |
| |
| @Override |
| public void afterCommit() throws Exception { |
| doClose(); |
| synchronizationRegistered = false; |
| } |
| |
| @Override |
| public void afterRollback() throws Exception { |
| doClose(); |
| synchronizationRegistered = false; |
| } |
| }); |
| } |
| |
| } else { |
| doClose(); |
| } |
| } |
| } |
| |
| private void doClose() throws JMSException { |
| dispose(); |
| RemoveInfo removeCommand = info.createRemoveCommand(); |
| removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); |
| connection.asyncSendPacket(removeCommand); |
| } |
| |
| final AtomicInteger clearRequestsCounter = new AtomicInteger(0); |
| void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { |
| clearRequestsCounter.incrementAndGet(); |
| executor.clearMessagesInProgress(); |
| // we are called from inside the transport reconnection logic which involves us |
| // clearing all the connections' consumers dispatch and delivered lists. So rather |
| // than trying to grab a mutex (which could be already owned by the message listener |
| // calling the send or an ack) we allow it to complete in a separate thread via the |
| // scheduler and notify us via connection.transportInterruptionProcessingComplete() |
| // |
| // We must be careful though not to allow multiple calls to this method from a |
| // connection that is having issue becoming fully established from causing a large |
| // build up of scheduled tasks to clear the same consumers over and over. |
| if (consumers.isEmpty()) { |
| return; |
| } |
| |
| if (clearInProgress.compareAndSet(false, true)) { |
| for (final ActiveMQMessageConsumer consumer : consumers) { |
| consumer.inProgressClearRequired(); |
| transportInterruptionProcessingComplete.incrementAndGet(); |
| try { |
| connection.getScheduler().executeAfterDelay(new Runnable() { |
| @Override |
| public void run() { |
| consumer.clearMessagesInProgress(); |
| }}, 0l); |
| } catch (JMSException e) { |
| connection.onClientInternalException(e); |
| } |
| } |
| |
| try { |
| connection.getScheduler().executeAfterDelay(new Runnable() { |
| @Override |
| public void run() { |
| clearInProgress.set(false); |
| }}, 0l); |
| } catch (JMSException e) { |
| connection.onClientInternalException(e); |
| } |
| } |
| } |
| |
| void deliverAcks() { |
| for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { |
| ActiveMQMessageConsumer consumer = iter.next(); |
| consumer.deliverAcks(); |
| } |
| } |
| |
| public synchronized void dispose() throws JMSException { |
| if (!closed) { |
| |
| try { |
| executor.close(); |
| |
| for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { |
| ActiveMQMessageConsumer consumer = iter.next(); |
| consumer.setFailureError(connection.getFirstFailureError()); |
| consumer.dispose(); |
| lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); |
| } |
| consumers.clear(); |
| |
| for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { |
| ActiveMQMessageProducer producer = iter.next(); |
| producer.dispose(); |
| } |
| producers.clear(); |
| |
| try { |
| if (getTransactionContext().isInLocalTransaction()) { |
| rollback(); |
| } |
| } catch (JMSException e) { |
| } |
| |
| } finally { |
| connection.removeSession(this); |
| closed = true; |
| this.transactionContext = null; |
| } |
| } |
| } |
| |
| /** |
| * Checks that the session is not closed then configures the message |
| */ |
| protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { |
| checkClosed(); |
| message.setConnection(connection); |
| } |
| |
| /** |
| * Check if the session is closed. It is used for ensuring that the session |
| * is open before performing various operations. |
| * |
| * @throws IllegalStateException if the Session is closed |
| */ |
| protected void checkClosed() throws IllegalStateException { |
| if (closed) { |
| throw new IllegalStateException("The Session is closed"); |
| } |
| } |
| |
| /** |
| * Checks if the session is closed. |
| * |
| * @return true if the session is closed, false otherwise. |
| */ |
| public boolean isClosed() { |
| return closed; |
| } |
| |
| /** |
| * Stops message delivery in this session, and restarts message delivery |
| * with the oldest unacknowledged message. |
| * <P> |
| * All consumers deliver messages in a serial order. Acknowledging a |
| * received message automatically acknowledges all messages that have been |
| * delivered to the client. |
| * <P> |
| * Restarting a session causes it to take the following actions: |
| * <UL> |
| * <LI>Stop message delivery |
| * <LI>Mark all messages that might have been delivered but not |
| * acknowledged as "redelivered" |
| * <LI>Restart the delivery sequence including all unacknowledged messages |
| * that had been previously delivered. Redelivered messages do not have to |
| * be delivered in exactly their original delivery order. |
| * </UL> |
| * |
| * @throws JMSException if the JMS provider fails to stop and restart |
| * message delivery due to some internal error. |
| * @throws IllegalStateException if the method is called by a transacted |
| * session. |
| */ |
| @Override |
| public void recover() throws JMSException { |
| |
| checkClosed(); |
| if (getTransacted()) { |
| throw new IllegalStateException("This session is transacted"); |
| } |
| |
| for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { |
| ActiveMQMessageConsumer c = iter.next(); |
| c.rollback(); |
| } |
| |
| } |
| |
| /** |
| * Returns the session's distinguished message listener (optional). |
| * |
| * @return the message listener associated with this session |
| * @throws JMSException if the JMS provider fails to get the message |
| * listener due to an internal error. |
| * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) |
| * @see javax.jms.ServerSessionPool |
| * @see javax.jms.ServerSession |
| */ |
| @Override |
| public MessageListener getMessageListener() throws JMSException { |
| checkClosed(); |
| return this.messageListener; |
| } |
| |
| /** |
| * Sets the session's distinguished message listener (optional). |
| * <P> |
| * When the distinguished message listener is set, no other form of message |
| * receipt in the session can be used; however, all forms of sending |
| * messages are still supported. |
| * <P> |
| * If this session has been closed, then an {@link IllegalStateException} is |
| * thrown, if trying to set a new listener. However setting the listener |
| * to <tt>null</tt> is allowed, to clear the listener, even if this session |
| * has been closed prior. |
| * <P> |
| * This is an expert facility not used by regular JMS clients. |
| * |
| * @param listener the message listener to associate with this session |
| * @throws JMSException if the JMS provider fails to set the message |
| * listener due to an internal error. |
| * @see javax.jms.Session#getMessageListener() |
| * @see javax.jms.ServerSessionPool |
| * @see javax.jms.ServerSession |
| */ |
| @Override |
| public void setMessageListener(MessageListener listener) throws JMSException { |
| // only check for closed if we set a new listener, as we allow to clear |
| // the listener, such as when an application is shutting down, and is |
| // no longer using a message listener on this session |
| if (listener != null) { |
| checkClosed(); |
| } |
| this.messageListener = listener; |
| |
| if (listener != null) { |
| executor.setDispatchedBySessionPool(true); |
| } |
| } |
| |
| /** |
| * Optional operation, intended to be used only by Application Servers, not |
| * by ordinary JMS clients. |
| * |
| * @see javax.jms.ServerSession |
| */ |
| @Override |
| public void run() { |
| MessageDispatch messageDispatch; |
| while ((messageDispatch = executor.dequeueNoWait()) != null) { |
| final MessageDispatch md = messageDispatch; |
| |
| // subset of org.apache.activemq.ActiveMQMessageConsumer.createActiveMQMessage |
| final ActiveMQMessage message = (ActiveMQMessage)md.getMessage().copy(); |
| if (message.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) { |
| ((ActiveMQBlobMessage)message).setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); |
| } |
| if (message.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) { |
| ((ActiveMQObjectMessage)message).setTrustAllPackages(getConnection().isTrustAllPackages()); |
| ((ActiveMQObjectMessage)message).setTrustedPackages(getConnection().getTrustedPackages()); |
| } |
| |
| MessageAck earlyAck = null; |
| if (message.isExpired()) { |
| earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); |
| earlyAck.setFirstMessageId(message.getMessageId()); |
| } else if (connection.isDuplicate(ActiveMQSession.this, message)) { |
| LOG.debug("{} got duplicate: {}", this, message.getMessageId()); |
| earlyAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1); |
| earlyAck.setFirstMessageId(md.getMessage().getMessageId()); |
| earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); |
| } |
| if (earlyAck != null) { |
| try { |
| asyncSendPacket(earlyAck); |
| } catch (Throwable t) { |
| LOG.error("error dispatching ack: {} ", earlyAck, t); |
| connection.onClientInternalException(t); |
| } finally { |
| continue; |
| } |
| } |
| |
| if (isClientAcknowledge()||isIndividualAcknowledge()) { |
| message.setAcknowledgeCallback(new Callback() { |
| @Override |
| public void execute() throws Exception { |
| } |
| }); |
| } |
| |
| if (deliveryListener != null) { |
| deliveryListener.beforeDelivery(this, message); |
| } |
| |
| md.setDeliverySequenceId(getNextDeliveryId()); |
| lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId(); |
| |
| final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); |
| |
| final AtomicBoolean afterDeliveryError = new AtomicBoolean(false); |
| /* |
| * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched. |
| * We dont want the after deliver being called after the redeliver as it may cause some weird stuff. |
| * */ |
| synchronized (redeliveryGuard) { |
| try { |
| ack.setFirstMessageId(md.getMessage().getMessageId()); |
| doStartTransaction(); |
| ack.setTransactionId(getTransactionContext().getTransactionId()); |
| if (ack.getTransactionId() != null) { |
| getTransactionContext().addSynchronization(new Synchronization() { |
| |
| final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); |
| |
| @Override |
| public void beforeEnd() throws Exception { |
| // validate our consumer so we don't push stale acks that get ignored |
| if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { |
| LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); |
| throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); |
| } |
| LOG.trace("beforeEnd ack {}", ack); |
| sendAck(ack); |
| } |
| |
| @Override |
| public void afterRollback() throws Exception { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("afterRollback {}", ack, new Throwable("here")); |
| } |
| // ensure we don't filter this as a duplicate |
| connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); |
| |
| // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect |
| if (clearRequestsCounter.get() > clearRequestCount) { |
| LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); |
| return; |
| } |
| |
| // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched |
| if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { |
| LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); |
| return; |
| } |
| |
| RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); |
| int redeliveryCounter = md.getMessage().getRedeliveryCounter(); |
| if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES |
| && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { |
| // We need to NACK the messages so that they get |
| // sent to the |
| // DLQ. |
| // Acknowledge the last message. |
| MessageAck ack = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1); |
| ack.setFirstMessageId(md.getMessage().getMessageId()); |
| ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); |
| LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack); |
| asyncSendPacket(ack); |
| |
| } else { |
| |
| MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); |
| ack.setFirstMessageId(md.getMessage().getMessageId()); |
| asyncSendPacket(ack); |
| |
| // Figure out how long we should wait to resend |
| // this message. |
| long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); |
| for (int i = 0; i < redeliveryCounter; i++) { |
| redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); |
| } |
| |
| /* |
| * If we are a non blocking delivery then we need to stop the executor to avoid more |
| * messages being delivered, once the message is redelivered we can restart it. |
| * */ |
| if (!connection.isNonBlockingRedelivery()) { |
| LOG.debug("Blocking session until re-delivery..."); |
| executor.stop(); |
| } |
| |
| connection.getScheduler().executeAfterDelay(new Runnable() { |
| |
| @Override |
| public void run() { |
| /* |
| * wait for the first delivery to be complete, i.e. after delivery has been called. |
| * */ |
| synchronized (redeliveryGuard) { |
| /* |
| * If its non blocking then we can just dispatch in a new session. |
| * */ |
| if (connection.isNonBlockingRedelivery()) { |
| ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); |
| } else { |
| /* |
| * If there has been an error thrown during afterDelivery then the |
| * endpoint will be marked as dead so redelivery will fail (and eventually |
| * the session marked as stale), in this case we can only call dispatch |
| * which will create a new session with a new endpoint. |
| * */ |
| if (afterDeliveryError.get()) { |
| ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); |
| } else { |
| executor.executeFirst(md); |
| executor.start(); |
| } |
| } |
| } |
| } |
| }, redeliveryDelay); |
| } |
| md.getMessage().onMessageRolledBack(); |
| } |
| }); |
| } |
| |
| LOG.trace("{} onMessage({})", this, message.getMessageId()); |
| messageListener.onMessage(message); |
| |
| } catch (Throwable e) { |
| if (!isClosed()) { |
| LOG.error("{} error dispatching message: {} ", this, message.getMessageId(), e); |
| } |
| |
| if (getTransactionContext() != null && getTransactionContext().isInXATransaction()) { |
| LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext()); |
| getTransactionContext().setRollbackOnly(true); |
| } |
| |
| // A problem while invoking the MessageListener does not |
| // in general indicate a problem with the connection to the broker, i.e. |
| // it will usually be sufficient to let the afterDelivery() method either |
| // commit or roll back in order to deal with the exception. |
| // However, we notify any registered client internal exception listener |
| // of the problem. |
| connection.onClientInternalException(e); |
| } finally { |
| if (ack.getTransactionId() == null) { |
| try { |
| asyncSendPacket(ack); |
| } catch (Throwable e) { |
| connection.onClientInternalException(e); |
| } |
| } |
| } |
| |
| if (deliveryListener != null) { |
| try { |
| deliveryListener.afterDelivery(this, message); |
| } catch (Throwable t) { |
| LOG.debug("Unable to call after delivery", t); |
| afterDeliveryError.set(true); |
| throw new RuntimeException(t); |
| } |
| } |
| } |
| /* |
| * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway. |
| * It also needs to be outside the redelivery guard. |
| * */ |
| try { |
| executor.waitForQueueRestart(); |
| } catch (InterruptedException ex) { |
| connection.onClientInternalException(ex); |
| } |
| } |
| } |
| |
| /** |
| * Creates a <CODE>MessageProducer</CODE> to send messages to the |
| * specified destination. |
| * <P> |
| * A client uses a <CODE>MessageProducer</CODE> object to send messages to |
| * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both |
| * inherit from <CODE>Destination</CODE>, they can be used in the |
| * destination parameter to create a <CODE>MessageProducer</CODE> object. |
| * |
| * @param destination the <CODE>Destination</CODE> to send to, or null if |
| * this is a producer which does not have a specified |
| * destination. |
| * @return the MessageProducer |
| * @throws JMSException if the session fails to create a MessageProducer due |
| * to some internal error. |
| * @throws InvalidDestinationException if an invalid destination is |
| * specified. |
| * @since 1.1 |
| */ |
| @Override |
| public MessageProducer createProducer(Destination destination) throws JMSException { |
| checkClosed(); |
| if (destination instanceof CustomDestination) { |
| CustomDestination customDestination = (CustomDestination)destination; |
| return customDestination.createProducer(this); |
| } |
| int timeSendOut = connection.getSendTimeout(); |
| return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); |
| } |
| |
| /** |
| * Creates a <CODE>MessageConsumer</CODE> for the specified destination. |
| * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from |
| * <CODE>Destination</CODE>, they can be used in the destination |
| * parameter to create a <CODE>MessageConsumer</CODE>. |
| * |
| * @param destination the <CODE>Destination</CODE> to access. |
| * @return the MessageConsumer |
| * @throws JMSException if the session fails to create a consumer due to |
| * some internal error. |
| * @throws InvalidDestinationException if an invalid destination is |
| * specified. |
| * @since 1.1 |
| */ |
| @Override |
| public MessageConsumer createConsumer(Destination destination) throws JMSException { |
| return createConsumer(destination, (String) null); |
| } |
| |
| /** |
| * Creates a <CODE>MessageConsumer</CODE> for the specified destination, |
| * using a message selector. Since <CODE> Queue</CODE> and |
| * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they |
| * can be used in the destination parameter to create a |
| * <CODE>MessageConsumer</CODE>. |
| * <P> |
| * A client uses a <CODE>MessageConsumer</CODE> object to receive messages |
| * that have been sent to a destination. |
| * |
| * @param destination the <CODE>Destination</CODE> to access |
| * @param messageSelector only messages with properties matching the message |
| * selector expression are delivered. A value of null or an |
| * empty string indicates that there is no message selector |
| * for the message consumer. |
| * @return the MessageConsumer |
| * @throws JMSException if the session fails to create a MessageConsumer due |
| * to some internal error. |
| * @throws InvalidDestinationException if an invalid destination is |
| * specified. |
| * @throws InvalidSelectorException if the message selector is invalid. |
| * @since 1.1 |
| */ |
| @Override |
| public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { |
| return createConsumer(destination, messageSelector, false); |
| } |
| |
| /** |
| * Creates a <CODE>MessageConsumer</CODE> for the specified destination. |
| * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from |
| * <CODE>Destination</CODE>, they can be used in the destination |
| * parameter to create a <CODE>MessageConsumer</CODE>. |
| * |
| * @param destination the <CODE>Destination</CODE> to access. |
| * @param messageListener the listener to use for async consumption of messages |
| * @return the MessageConsumer |
| * @throws JMSException if the session fails to create a consumer due to |
| * some internal error. |
| * @throws InvalidDestinationException if an invalid destination is |
| * specified. |
| * @since 1.1 |
| */ |
| public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { |
| return createConsumer(destination, null, messageListener); |
| } |
| |
| /** |
| * Creates a <CODE>MessageConsumer</CODE> for the specified destination, |
| * using a message selector. Since <CODE> Queue</CODE> and |
| * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they |
| * can be used in the destination parameter to create a |
| * <CODE>MessageConsumer</CODE>. |
| * <P> |
| * A client uses a <CODE>MessageConsumer</CODE> object to receive messages |
| * that have been sent to a destination. |
| * |
| * @param destination the <CODE>Destination</CODE> to access |
| * @param messageSelector only messages with properties matching the message |
| * selector expression are delivered. A value of null or an |
| * empty string indicates that there is no message selector |
| * for the message consumer. |
| * @param messageListener the listener to use for async consumption of messages |
| * @return the MessageConsumer |
| * @throws JMSException if the session fails to create a MessageConsumer due |
| * to some internal error. |
| * @throws InvalidDestinationException if an invalid destination is |
| * specified. |
| * @throws InvalidSelectorException if the message selector is invalid. |
| * @since 1.1 |
| */ |
| public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { |
| return createConsumer(destination, messageSelector, false, messageListener); |
| } |
| |
| /** |
| * Creates <CODE>MessageConsumer</CODE> for the specified destination, |
| * using a message selector. This method can specify whether messages |
| * published by its own connection should be delivered to it, if the |
| * destination is a topic. |
| * <P> |
| * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from |
| * <CODE>Destination</CODE>, they can be used in the destination |
| * parameter to create a <CODE>MessageConsumer</CODE>. |
| * <P> |
| * A client uses a <CODE>MessageConsumer</CODE> object to receive messages |
| * that have been published to a destination. |
| * <P> |
| * In some cases, a connection may both publish and subscribe to a topic. |
| * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to |
| * inhibit the delivery of messages published by its own connection. The |
| * default value for this attribute is False. The <CODE>noLocal</CODE> |
| * value must be supported by destinations that are topics. |
| * |
| * @param destination the <CODE>Destination</CODE> to access |
| * @param messageSelector only messages with properties matching the message |
| * selector expression are delivered. A value of null or an |
| * empty string indicates that there is no message selector |
| * for the message consumer. |
| * @param noLocal - if true, and the destination is a topic, inhibits the |
| * delivery of messages published by its own connection. The |
| * behavior for <CODE>NoLocal</CODE> is not specified if |
| * the destination is a queue. |
| * @return the MessageConsumer |
| * @throws JMSException if the session fails to create a MessageConsumer due |
| * to some internal error. |
| * @throws InvalidDestinationException if an invalid destination is |
| * specified. |
| * @throws InvalidSelectorException if the message selector is invalid. |
| * @since 1.1 |
| */ |
| @Override |
| public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { |
| return createConsumer(destination, messageSelector, noLocal, null); |
| } |
| |
| /** |
| * Creates <CODE>MessageConsumer</CODE> for the specified destination, |
| * using a message selector. This method can specify whether messages |
| * published by its own connection should be delivered to it, if the |
| * destination is a topic. |
| * <P> |
| * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from |
| * <CODE>Destination</CODE>, they can be used in the destination |
| * parameter to create a <CODE>MessageConsumer</CODE>. |
| * <P> |
| * A client uses a <CODE>MessageConsumer</CODE> object to receive messages |
| * that have been published to a destination. |
| * <P> |
| * In some cases, a connection may both publish and subscribe to a topic. |
| * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to |
| * inhibit the delivery of messages published by its own connection. The |
| * default value for this attribute is False. The <CODE>noLocal</CODE> |
| * value must be supported by destinations that are topics. |
| * |
| * @param destination the <CODE>Destination</CODE> to access |
| * @param messageSelector only messages with properties matching the message |
| * selector expression are delivered. A value of null or an |
| * empty string indicates that there is no message selector |
| * for the message consumer. |
| * @param noLocal - if true, and the destination is a topic, inhibits the |
| * delivery of messages published by its own connection. The |
| * behavior for <CODE>NoLocal</CODE> is not specified if |
| * the destination is a queue. |
| * @param messageListener the listener to use for async consumption of messages |
| * @return the MessageConsumer |
| * @throws JMSException if the session fails to create a MessageConsumer due |
| * to some internal error. |
| * @throws InvalidDestinationException if an invalid destination is |
| * specified. |
| * @throws InvalidSelectorException if the message selector is invalid. |
| * @since 1.1 |
| */ |
| public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { |
| checkClosed(); |
| |
| if (destination instanceof CustomDestination) { |
| CustomDestination customDestination = (CustomDestination)destination; |
| return customDestination.createConsumer(this, messageSelector, noLocal); |
| } |
| |
| ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); |
| int prefetch = 0; |
| if (destination instanceof Topic) { |
| prefetch = prefetchPolicy.getTopicPrefetch(); |
| } else { |
| prefetch = prefetchPolicy.getQueuePrefetch(); |
| } |
| ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); |
| return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, |
| prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); |
| } |
| |
| /** |
| * Creates a queue identity given a <CODE>Queue</CODE> name. |
| * <P> |
| * This facility is provided for the rare cases where clients need to |
| * dynamically manipulate queue identity. It allows the creation of a queue |
| * identity with a provider-specific name. Clients that depend on this |
| * ability are not portable. |
| * <P> |
| * Note that this method is not for creating the physical queue. The |
| * physical creation of queues is an administrative task and is not to be |
| * initiated by the JMS API. The one exception is the creation of temporary |
| * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> |
| * method. |
| * |
| * @param queueName the name of this <CODE>Queue</CODE> |
| * @return a <CODE>Queue</CODE> with the given name |
| * @throws JMSException if the session fails to create a queue due to some |
| * internal error. |
| * @since 1.1 |
| */ |
| @Override |
| public Queue createQueue(String queueName) throws JMSException { |
| checkClosed(); |
| if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { |
| return new ActiveMQTempQueue(queueName); |
| } |
| return new ActiveMQQueue(queueName); |
| } |
| |
| /** |
| * Creates a topic identity given a <CODE>Topic</CODE> name. |
| * <P> |
| * This facility is provided for the rare cases where clients need to |
| * dynamically manipulate topic identity. This allows the creation of a |
| * topic identity with a provider-specific name. Clients that depend on this |
| * ability are not portable. |
| * <P> |
| * Note that this method is not for creating the physical topic. The |
| * physical creation of topics is an administrative task and is not to be |
| * initiated by the JMS API. The one exception is the creation of temporary |
| * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> |
| * method. |
| * |
| * @param topicName the name of this <CODE>Topic</CODE> |
| * @return a <CODE>Topic</CODE> with the given name |
| * @throws JMSException if the session fails to create a topic due to some |
| * internal error. |
| * @since 1.1 |
| */ |
| @Override |
| public Topic createTopic(String topicName) throws JMSException { |
| checkClosed(); |
| if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { |
| return new ActiveMQTempTopic(topicName); |
| } |
| return new ActiveMQTopic(topicName); |
| } |
| |
| /** |
| * Creates a durable subscriber to the specified topic. |
| * <P> |
| * If a client needs to receive all the messages published on a topic, |
| * including the ones published while the subscriber is inactive, it uses a |
| * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a |
| * record of this durable subscription and insures that all messages from |
| * the topic's publishers are retained until they are acknowledged by this |
| * durable subscriber or they have expired. |
| * <P> |
| * Sessions with durable subscribers must always provide the same client |
| * identifier. In addition, each client must specify a name that uniquely |
| * identifies (within client identifier) each durable subscription it |
| * creates. Only one session at a time can have a |
| * <CODE>TopicSubscriber</CODE> for a particular durable subscription. |
| * <P> |
| * A client can change an existing durable subscription by creating a |
| * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic |
| * and/or message selector. Changing a durable subscriber is equivalent to |
| * unsubscribing (deleting) the old one and creating a new one. |
| * <P> |
| * In some cases, a connection may both publish and subscribe to a topic. |
| * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to |
| * inhibit the delivery of messages published by its own connection. The |
| * default value for this attribute is false. |
| * |
| * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to |
| * @param name the name used to identify this subscription |
| * @return the TopicSubscriber |
| * @throws JMSException if the session fails to create a subscriber due to |
| * some internal error. |
| * @throws InvalidDestinationException if an invalid topic is specified. |
| * @since 1.1 |
| */ |
| @Override |
| public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { |
| checkClosed(); |
| return createDurableSubscriber(topic, name, null, false); |
| } |
| |
| /** |
| * Creates a durable subscriber to the specified topic, using a message |
| * selector and specifying whether messages published by its own connection |
| * should be delivered to it. |
| * <P> |
| * If a client needs to receive all the messages published on a topic, |
| * including the ones published while the subscriber is inactive, it uses a |
| * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a |
| * record of this durable subscription and insures that all messages from |
| * the topic's publishers are retained until they are acknowledged by this |
| * durable subscriber or they have expired. |
| * <P> |
| * Sessions with durable subscribers must always provide the same client |
| * identifier. In addition, each client must specify a name which uniquely |
| * identifies (within client identifier) each durable subscription it |
| * creates. Only one session at a time can have a |
| * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An |
| * inactive durable subscriber is one that exists but does not currently |
| * have a message consumer associated with it. |
| * <P> |
| * A client can change an existing durable subscription by creating a |
| * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic |
| * and/or message selector. Changing a durable subscriber is equivalent to |
| * unsubscribing (deleting) the old one and creating a new one. |
| * |
| * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to |
| * @param name the name used to identify this subscription |
| * @param messageSelector only messages with properties matching the message |
| * selector expression are delivered. A value of null or an |
| * empty string indicates that there is no message selector |
| * for the message consumer. |
| * @param noLocal if set, inhibits the delivery of messages published by its |
| * own connection |
| * @return the Queue Browser |
| * @throws JMSException if the session fails to create a subscriber due to |
| * some internal error. |
| * @throws InvalidDestinationException if an invalid topic is specified. |
| * @throws InvalidSelectorException if the message selector is invalid. |
| * @since 1.1 |
| */ |
| @Override |
| public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { |
| checkClosed(); |
| |
| if (topic == null) { |
| throw new InvalidDestinationException("Topic cannot be null"); |
| } |
| |
| if (topic instanceof CustomDestination) { |
| CustomDestination customDestination = (CustomDestination)topic; |
| return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); |
| } |
| |
| connection.checkClientIDWasManuallySpecified(); |
| ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); |
| int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); |
| int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); |
| return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, |
| noLocal, false, asyncDispatch); |
| } |
| |
| /** |
| * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on |
| * the specified queue. |
| * |
| * @param queue the <CODE>queue</CODE> to access |
| * @return the Queue Browser |
| * @throws JMSException if the session fails to create a browser due to some |
| * internal error. |
| * @throws InvalidDestinationException if an invalid destination is |
| * specified |
| * @since 1.1 |
| */ |
| @Override |
| public QueueBrowser createBrowser(Queue queue) throws JMSException { |
| checkClosed(); |
| return createBrowser(queue, null); |
| } |
| |
| /** |
| * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on |
| * the specified queue using a message selector. |
| * |
| * @param queue the <CODE>queue</CODE> to access |
| * @param messageSelector only messages with properties matching the message |
| * selector expression are delivered. A value of null or an |
| * empty string indicates that there is no message selector |
| * for the message consumer. |
| * @return the Queue Browser |
| * @throws JMSException if the session fails to create a browser due to some |
| * internal error. |
| * @throws InvalidDestinationException if an invalid destination is |
| * specified |
| * @throws InvalidSelectorException if the message selector is invalid. |
| * @since 1.1 |
| */ |
| @Override |
| public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { |
| checkClosed(); |
| return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); |
| } |
| |
| /** |
| * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that |
| * of the <CODE>Connection</CODE> unless it is deleted earlier. |
| * |
| * @return a temporary queue identity |
| * @throws JMSException if the session fails to create a temporary queue due |
| * to some internal error. |
| * @since 1.1 |
| */ |
| @Override |
| public TemporaryQueue createTemporaryQueue() throws JMSException { |
| checkClosed(); |
| return (TemporaryQueue)connection.createTempDestination(false); |
| } |
| |
| /** |
| * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that |
| * of the <CODE>Connection</CODE> unless it is deleted earlier. |
| * |
| * @return a temporary topic identity |
| * @throws JMSException if the session fails to create a temporary topic due |
| * to some internal error. |
| * @since 1.1 |
| */ |
| @Override |
| public TemporaryTopic createTemporaryTopic() throws JMSException { |
| checkClosed(); |
| return (TemporaryTopic)connection.createTempDestination(true); |
| } |
| |
| /** |
| * Creates a <CODE>QueueReceiver</CODE> object to receive messages from |
| * the specified queue. |
| * |
| * @param queue the <CODE>Queue</CODE> to access |
| * @return a new QueueBrowser instance. |
| * @throws JMSException if the session fails to create a receiver due to |
| * some internal error. |
| * @throws JMSException |
| * @throws InvalidDestinationException if an invalid queue is specified. |
| */ |
| @Override |
| public QueueReceiver createReceiver(Queue queue) throws JMSException { |
| checkClosed(); |
| return createReceiver(queue, null); |
| } |
| |
| /** |
| * Creates a <CODE>QueueReceiver</CODE> object to receive messages from |
| * the specified queue using a message selector. |
| * |
| * @param queue the <CODE>Queue</CODE> to access |
| * @param messageSelector only messages with properties matching the message |
| * selector expression are delivered. A value of null or an |
| * empty string indicates that there is no message selector |
| * for the message consumer. |
| * @return QueueReceiver |
| * @throws JMSException if the session fails to create a receiver due to |
| * some internal error. |
| * @throws InvalidDestinationException if an invalid queue is specified. |
| * @throws InvalidSelectorException if the message selector is invalid. |
| */ |
| @Override |
| public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { |
| checkClosed(); |
| |
| if (queue instanceof CustomDestination) { |
| CustomDestination customDestination = (CustomDestination)queue; |
| return customDestination.createReceiver(this, messageSelector); |
| } |
| |
| ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); |
| return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), |
| prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); |
| } |
| |
| /** |
| * Creates a <CODE>QueueSender</CODE> object to send messages to the |
| * specified queue. |
| * |
| * @param queue the <CODE>Queue</CODE> to access, or null if this is an |
| * unidentified producer |
| * @return QueueSender |
| * @throws JMSException if the session fails to create a sender due to some |
| * internal error. |
| * @throws InvalidDestinationException if an invalid queue is specified. |
| */ |
| @Override |
| public QueueSender createSender(Queue queue) throws JMSException { |
| checkClosed(); |
| if (queue instanceof CustomDestination) { |
| CustomDestination customDestination = (CustomDestination)queue; |
| return customDestination.createSender(this); |
| } |
| int timeSendOut = connection.getSendTimeout(); |
| return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); |
| } |
| |
| /** |
| * Creates a nondurable subscriber to the specified topic. <p/> |
| * <P> |
| * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages |
| * that have been published to a topic. <p/> |
| * <P> |
| * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They |
| * receive only messages that are published while they are active. <p/> |
| * <P> |
| * In some cases, a connection may both publish and subscribe to a topic. |
| * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to |
| * inhibit the delivery of messages published by its own connection. The |
| * default value for this attribute is false. |
| * |
| * @param topic the <CODE>Topic</CODE> to subscribe to |
| * @return TopicSubscriber |
| * @throws JMSException if the session fails to create a subscriber due to |
| * some internal error. |
| * @throws InvalidDestinationException if an invalid topic is specified. |
| */ |
| @Override |
| public TopicSubscriber createSubscriber(Topic topic) throws JMSException { |
| checkClosed(); |
| return createSubscriber(topic, null, false); |
| } |
| |
| /** |
| * Creates a nondurable subscriber to the specified topic, using a message |
| * selector or specifying whether messages published by its own connection |
| * should be delivered to it. <p/> |
| * <P> |
| * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages |
| * that have been published to a topic. <p/> |
| * <P> |
| * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They |
| * receive only messages that are published while they are active. <p/> |
| * <P> |
| * Messages filtered out by a subscriber's message selector will never be |
| * delivered to the subscriber. From the subscriber's perspective, they do |
| * not exist. <p/> |
| * <P> |
| * In some cases, a connection may both publish and subscribe to a topic. |
| * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to |
| * inhibit the delivery of messages published by its own connection. The |
| * default value for this attribute is false. |
| * |
| * @param topic the <CODE>Topic</CODE> to subscribe to |
| * @param messageSelector only messages with properties matching the message |
| * selector expression are delivered. A value of null or an |
| * empty string indicates that there is no message selector |
| * for the message consumer. |
| * @param noLocal if set, inhibits the delivery of messages published by its |
| * own connection |
| * @return TopicSubscriber |
| * @throws JMSException if the session fails to create a subscriber due to |
| * some internal error. |
| * @throws InvalidDestinationException if an invalid topic is specified. |
| * @throws InvalidSelectorException if the message selector is invalid. |
| */ |
| @Override |
| public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { |
| checkClosed(); |
| |
| if (topic instanceof CustomDestination) { |
| CustomDestination customDestination = (CustomDestination)topic; |
| return customDestination.createSubscriber(this, messageSelector, noLocal); |
| } |
| |
| ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); |
| return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy |
| .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); |
| } |
| |
| /** |
| * Creates a publisher for the specified topic. <p/> |
| * <P> |
| * A client uses a <CODE>TopicPublisher</CODE> object to publish messages |
| * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on |
| * a topic, it defines a new sequence of messages that have no ordering |
| * relationship with the messages it has previously sent. |
| * |
| * @param topic the <CODE>Topic</CODE> to publish to, or null if this is |
| * an unidentified producer |
| * @return TopicPublisher |
| * @throws JMSException if the session fails to create a publisher due to |
| * some internal error. |
| * @throws InvalidDestinationException if an invalid topic is specified. |
| */ |
| @Override |
| public TopicPublisher createPublisher(Topic topic) throws JMSException { |
| checkClosed(); |
| |
| if (topic instanceof CustomDestination) { |
| CustomDestination customDestination = (CustomDestination)topic; |
| return customDestination.createPublisher(this); |
| } |
| int timeSendOut = connection.getSendTimeout(); |
| return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); |
| } |
| |
| /** |
| * Unsubscribes a durable subscription that has been created by a client. |
| * <P> |
| * This method deletes the state being maintained on behalf of the |
| * subscriber by its provider. |
| * <P> |
| * It is erroneous for a client to delete a durable subscription while there |
| * is an active <CODE>MessageConsumer </CODE> or |
| * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed |
| * message is part of a pending transaction or has not been acknowledged in |
| * the session. |
| * |
| * @param name the name used to identify this subscription |
| * @throws JMSException if the session fails to unsubscribe to the durable |
| * subscription due to some internal error. |
| * @throws InvalidDestinationException if an invalid subscription name is |
| * specified. |
| * @since 1.1 |
| */ |
| @Override |
| public void unsubscribe(String name) throws JMSException { |
| checkClosed(); |
| connection.unsubscribe(name); |
| } |
| |
| @Override |
| public void dispatch(MessageDispatch messageDispatch) { |
| try { |
| executor.execute(messageDispatch); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| connection.onClientInternalException(e); |
| } |
| } |
| |
| /** |
| * Acknowledges all consumed messages of the session of this consumed |
| * message. |
| * <P> |
| * All consumed JMS messages support the <CODE>acknowledge</CODE> method |
| * for use when a client has specified that its JMS session's consumed |
| * messages are to be explicitly acknowledged. By invoking |
| * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges |
| * all messages consumed by the session that the message was delivered to. |
| * <P> |
| * Calls to <CODE>acknowledge</CODE> are ignored for both transacted |
| * sessions and sessions specified to use implicit acknowledgement modes. |
| * <P> |
| * A client may individually acknowledge each message as it is consumed, or |
| * it may choose to acknowledge messages as an application-defined group |
| * (which is done by calling acknowledge on the last received message of the |
| * group, thereby acknowledging all messages consumed by the session.) |
| * <P> |
| * Messages that have been received but not acknowledged may be redelivered. |
| * |
| * @throws JMSException if the JMS provider fails to acknowledge the |
| * messages due to some internal error. |
| * @throws javax.jms.IllegalStateException if this method is called on a |
| * closed session. |
| * @see javax.jms.Session#CLIENT_ACKNOWLEDGE |
| */ |
| public void acknowledge() throws JMSException { |
| for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { |
| ActiveMQMessageConsumer c = iter.next(); |
| c.acknowledge(); |
| } |
| } |
| |
| /** |
| * Add a message consumer. |
| * |
| * @param consumer - message consumer. |
| * @throws JMSException |
| */ |
| protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { |
| this.consumers.add(consumer); |
| if (consumer.isDurableSubscriber()) { |
| stats.onCreateDurableSubscriber(); |
| } |
| this.connection.addDispatcher(consumer.getConsumerId(), this); |
| } |
| |
| /** |
| * Remove the message consumer. |
| * |
| * @param consumer - consumer to be removed. |
| * @throws JMSException |
| */ |
| protected void removeConsumer(ActiveMQMessageConsumer consumer) { |
| this.connection.removeDispatcher(consumer.getConsumerId()); |
| if (consumer.isDurableSubscriber()) { |
| stats.onRemoveDurableSubscriber(); |
| } |
| this.consumers.remove(consumer); |
| this.connection.removeDispatcher(consumer); |
| } |
| |
| /** |
| * Adds a message producer. |
| * |
| * @param producer - message producer to be added. |
| * @throws JMSException |
| */ |
| protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { |
| this.producers.add(producer); |
| this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); |
| } |
| |
| /** |
| * Removes a message producer. |
| * |
| * @param producer - message producer to be removed. |
| * @throws JMSException |
| */ |
| protected void removeProducer(ActiveMQMessageProducer producer) { |
| this.connection.removeProducer(producer.getProducerInfo().getProducerId()); |
| this.producers.remove(producer); |
| } |
| |
| /** |
| * Start this Session. |
| * |
| * @throws JMSException |
| */ |
| protected void start() throws JMSException { |
| started.set(true); |
| for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { |
| ActiveMQMessageConsumer c = iter.next(); |
| c.start(); |
| } |
| executor.start(); |
| } |
| |
| /** |
| * Stops this session. |
| * |
| * @throws JMSException |
| */ |
| protected void stop() throws JMSException { |
| |
| for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { |
| ActiveMQMessageConsumer c = iter.next(); |
| c.stop(); |
| } |
| |
| started.set(false); |
| executor.stop(); |
| } |
| |
| /** |
| * Returns the session id. |
| * |
| * @return value - session id. |
| */ |
| protected SessionId getSessionId() { |
| return info.getSessionId(); |
| } |
| |
| /** |
| * @return a unique ConsumerId instance. |
| */ |
| protected ConsumerId getNextConsumerId() { |
| return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); |
| } |
| |
| /** |
| * @return a unique ProducerId instance. |
| */ |
| protected ProducerId getNextProducerId() { |
| return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); |
| } |
| |
| /** |
| * Sends the message for dispatch by the broker. |
| * |
| * @param producer - message producer. |
| * @param destination - message destination. |
| * @param message - message to be sent. |
| * @param deliveryMode - JMS message delivery mode. |
| * @param priority - message priority. |
| * @param timeToLive - message expiration. |
| * @param producerWindow |
| * @param onComplete |
| * @throws JMSException |
| */ |
| protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, |
| MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { |
| |
| checkClosed(); |
| if (destination.isTemporary() && connection.isDeleted(destination)) { |
| throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); |
| } |
| synchronized (sendMutex) { |
| // tell the Broker we are about to start a new transaction |
| doStartTransaction(); |
| if (transactionContext.isRollbackOnly()) { |
| throw new IllegalStateException("transaction marked rollback only"); |
| } |
| TransactionId txid = transactionContext.getTransactionId(); |
| long sequenceNumber = producer.getMessageSequence(); |
| |
| //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 |
| message.setJMSDeliveryMode(deliveryMode); |
| long expiration = 0L; |
| if (!producer.getDisableMessageTimestamp()) { |
| long timeStamp = System.currentTimeMillis(); |
| message.setJMSTimestamp(timeStamp); |
| if (timeToLive > 0) { |
| expiration = timeToLive + timeStamp; |
| } |
| } |
| message.setJMSExpiration(expiration); |
| message.setJMSPriority(priority); |
| message.setJMSRedelivered(false); |
| |
| // transform to our own message format here |
| ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); |
| msg.setDestination(destination); |
| msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); |
| |
| // Set the message id. |
| if (msg != message) { |
| message.setJMSMessageID(msg.getMessageId().toString()); |
| // Make sure the JMS destination is set on the foreign messages too. |
| message.setJMSDestination(destination); |
| } |
| //clear the brokerPath in case we are re-sending this message |
| msg.setBrokerPath(null); |
| |
| msg.setTransactionId(txid); |
| if (connection.isCopyMessageOnSend()) { |
| msg = (ActiveMQMessage)msg.copy(); |
| } |
| msg.setConnection(connection); |
| msg.onSend(); |
| msg.setProducerId(msg.getMessageId().getProducerId()); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(getSessionId() + " sending message: " + msg); |
| } |
| if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { |
| this.connection.asyncSendPacket(msg); |
| if (producerWindow != null) { |
| // Since we defer lots of the marshaling till we hit the |
| // wire, this might not |
| // provide and accurate size. We may change over to doing |
| // more aggressive marshaling, |
| // to get more accurate sizes.. this is more important once |
| // users start using producer window |
| // flow control. |
| int size = msg.getSize(); |
| producerWindow.increaseUsage(size); |
| } |
| } else { |
| if (sendTimeout > 0 && onComplete==null) { |
| this.connection.syncSendPacket(msg,sendTimeout); |
| }else { |
| this.connection.syncSendPacket(msg, onComplete); |
| } |
| } |
| |
| } |
| } |
| |
| /** |
| * Send TransactionInfo to indicate transaction has started |
| * |
| * @throws JMSException if some internal error occurs |
| */ |
| protected void doStartTransaction() throws JMSException { |
| if (getTransacted() && !transactionContext.isInXATransaction()) { |
| transactionContext.begin(); |
| } |
| } |
| |
| /** |
| * Checks whether the session has unconsumed messages. |
| * |
| * @return true - if there are unconsumed messages. |
| */ |
| public boolean hasUncomsumedMessages() { |
| return executor.hasUncomsumedMessages(); |
| } |
| |
| /** |
| * Checks whether the session uses transactions. |
| * |
| * @return true - if the session uses transactions. |
| */ |
| public boolean isTransacted() { |
| return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); |
| } |
| |
| /** |
| * Checks whether the session used client acknowledgment. |
| * |
| * @return true - if the session uses client acknowledgment. |
| */ |
| protected boolean isClientAcknowledge() { |
| return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; |
| } |
| |
| /** |
| * Checks whether the session used auto acknowledgment. |
| * |
| * @return true - if the session uses client acknowledgment. |
| */ |
| public boolean isAutoAcknowledge() { |
| return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; |
| } |
| |
| /** |
| * Checks whether the session used dup ok acknowledgment. |
| * |
| * @return true - if the session uses client acknowledgment. |
| */ |
| public boolean isDupsOkAcknowledge() { |
| return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; |
| } |
| |
| public boolean isIndividualAcknowledge(){ |
| return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; |
| } |
| |
| /** |
| * Returns the message delivery listener. |
| * |
| * @return deliveryListener - message delivery listener. |
| */ |
| public DeliveryListener getDeliveryListener() { |
| return deliveryListener; |
| } |
| |
| /** |
| * Sets the message delivery listener. |
| * |
| * @param deliveryListener - message delivery listener. |
| */ |
| public void setDeliveryListener(DeliveryListener deliveryListener) { |
| this.deliveryListener = deliveryListener; |
| } |
| |
| /** |
| * Returns the SessionInfo bean. |
| * |
| * @return info - SessionInfo bean. |
| * @throws JMSException |
| */ |
| protected SessionInfo getSessionInfo() throws JMSException { |
| SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); |
| return info; |
| } |
| |
| /** |
| * Send the asynchronous command. |
| * |
| * @param command - command to be executed. |
| * @throws JMSException |
| */ |
| public void asyncSendPacket(Command command) throws JMSException { |
| connection.asyncSendPacket(command); |
| } |
| |
| /** |
| * Send the synchronous command. |
| * |
| * @param command - command to be executed. |
| * @return Response |
| * @throws JMSException |
| */ |
| public Response syncSendPacket(Command command) throws JMSException { |
| return connection.syncSendPacket(command); |
| } |
| |
| public long getNextDeliveryId() { |
| return deliveryIdGenerator.getNextSequenceId(); |
| } |
| |
| public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { |
| |
| List<MessageDispatch> c = unconsumedMessages.removeAll(); |
| Collections.reverse(c); |
| |
| for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { |
| MessageDispatch md = iter.next(); |
| executor.executeFirst(md); |
| } |
| |
| } |
| |
| public boolean isRunning() { |
| return started.get(); |
| } |
| |
| public boolean isAsyncDispatch() { |
| return asyncDispatch; |
| } |
| |
| public void setAsyncDispatch(boolean asyncDispatch) { |
| this.asyncDispatch = asyncDispatch; |
| } |
| |
| /** |
| * @return Returns the sessionAsyncDispatch. |
| */ |
| public boolean isSessionAsyncDispatch() { |
| return sessionAsyncDispatch; |
| } |
| |
| /** |
| * @param sessionAsyncDispatch The sessionAsyncDispatch to set. |
| */ |
| public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { |
| this.sessionAsyncDispatch = sessionAsyncDispatch; |
| } |
| |
| public MessageTransformer getTransformer() { |
| return transformer; |
| } |
| |
| public ActiveMQConnection getConnection() { |
| return connection; |
| } |
| |
| /** |
| * Sets the transformer used to transform messages before they are sent on |
| * to the JMS bus or when they are received from the bus but before they are |
| * delivered to the JMS client |
| */ |
| public void setTransformer(MessageTransformer transformer) { |
| this.transformer = transformer; |
| } |
| |
| public BlobTransferPolicy getBlobTransferPolicy() { |
| return blobTransferPolicy; |
| } |
| |
| /** |
| * Sets the policy used to describe how out-of-band BLOBs (Binary Large |
| * OBjects) are transferred from producers to brokers to consumers |
| */ |
| public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { |
| this.blobTransferPolicy = blobTransferPolicy; |
| } |
| |
| public List<MessageDispatch> getUnconsumedMessages() { |
| return executor.getUnconsumedMessages(); |
| } |
| |
| @Override |
| public String toString() { |
| return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + ",closed=" + closed + "} " + sendMutex; |
| } |
| |
| public void checkMessageListener() throws JMSException { |
| if (messageListener != null) { |
| throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); |
| } |
| for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { |
| ActiveMQMessageConsumer consumer = i.next(); |
| if (consumer.hasMessageListener()) { |
| throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); |
| } |
| } |
| } |
| |
| protected void setOptimizeAcknowledge(boolean value) { |
| for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { |
| ActiveMQMessageConsumer c = iter.next(); |
| c.setOptimizeAcknowledge(value); |
| } |
| } |
| |
| protected void setPrefetchSize(ConsumerId id, int prefetch) { |
| for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { |
| ActiveMQMessageConsumer c = iter.next(); |
| if (c.getConsumerId().equals(id)) { |
| c.setPrefetchSize(prefetch); |
| break; |
| } |
| } |
| } |
| |
| protected void close(ConsumerId id) { |
| for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { |
| ActiveMQMessageConsumer c = iter.next(); |
| if (c.getConsumerId().equals(id)) { |
| try { |
| c.close(); |
| } catch (JMSException e) { |
| LOG.warn("Exception closing consumer", e); |
| } |
| LOG.warn("Closed consumer on Command, " + id); |
| break; |
| } |
| } |
| } |
| |
| public boolean isInUse(ActiveMQTempDestination destination) { |
| for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { |
| ActiveMQMessageConsumer c = iter.next(); |
| if (c.isInUse(destination)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * highest sequence id of the last message delivered by this session. |
| * Passed to the broker in the close command, maintained by dispose() |
| * @return lastDeliveredSequenceId |
| */ |
| public long getLastDeliveredSequenceId() { |
| return lastDeliveredSequenceId; |
| } |
| |
| protected void sendAck(MessageAck ack) throws JMSException { |
| sendAck(ack,false); |
| } |
| |
| protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { |
| if (lazy || connection.isSendAcksAsync() || getTransacted()) { |
| asyncSendPacket(ack); |
| } else { |
| syncSendPacket(ack); |
| } |
| } |
| |
| protected Scheduler getScheduler() throws JMSException { |
| return this.connection.getScheduler(); |
| } |
| |
| protected ThreadPoolExecutor getConnectionExecutor() { |
| return this.connectionExecutor; |
| } |
| } |