blob: 5d11fb6d08acaa25a4e71a050c8a9ecc98044b8f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.XAConnection;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.management.JMSConnectionStatsImpl;
import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.state.CommandVisitorAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.RequestTimedOutIOException;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
public static int DEFAULT_THREAD_POOL_SIZE = 1000;
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
public final ConcurrentMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<>();
protected boolean dispatchAsync=true;
protected boolean alwaysSessionAsync = true;
private TaskRunnerFactory sessionTaskRunner;
private final ThreadPoolExecutor executor;
// Connection state variables
private final ConnectionInfo info;
private ExceptionListener exceptionListener;
private ClientInternalExceptionListener clientInternalExceptionListener;
private boolean clientIDSet;
private boolean isConnectionInfoSentToBroker;
private boolean userSpecifiedClientID;
// Configuration options variables
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
private BlobTransferPolicy blobTransferPolicy;
private RedeliveryPolicyMap redeliveryPolicyMap;
private MessageTransformer transformer;
private boolean disableTimeStampsByDefault;
private boolean optimizedMessageDispatch = true;
private boolean copyMessageOnSend = true;
private boolean useCompression;
private boolean objectMessageSerializationDefered;
private boolean useAsyncSend;
private boolean optimizeAcknowledge;
private long optimizeAcknowledgeTimeOut = 0;
private long optimizedAckScheduledAckInterval = 0;
private boolean nestedMapAndListEnabled = true;
private boolean useRetroactiveConsumer;
private boolean exclusiveConsumer;
private boolean alwaysSyncSend;
private int closeTimeout = 15000;
private boolean watchTopicAdvisories = true;
private long warnAboutUnstartedConnectionTimeout = 500L;
private int sendTimeout =0;
private boolean sendAcksAsync=true;
private boolean checkForDuplicates = true;
private boolean queueOnlyConnection = false;
private boolean consumerExpiryCheckEnabled = true;
private final Transport transport;
private final IdGenerator clientIdGenerator;
private final JMSStatsImpl factoryStats;
private final JMSConnectionStatsImpl stats;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean closing = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean transportFailed = new AtomicBoolean(false);
private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<>();
// Maps ConsumerIds to ActiveMQConsumer objects
private final ConcurrentMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<>();
private final ConcurrentMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<>();
private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
private final SessionId connectionSessionId;
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
private AdvisoryConsumer advisoryConsumer;
private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
private BrokerInfo brokerInfo;
private IOException firstFailureError;
private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
// Assume that protocol is the latest. Change to the actual protocol
// version when a WireFormatInfo is received.
private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private final long timeCreated;
private final ConnectionAudit connectionAudit = new ConnectionAudit();
private DestinationSource destinationSource;
private final Object ensureConnectionInfoSentMutex = new Object();
private boolean useDedicatedTaskRunner;
protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
private long consumerFailoverRedeliveryWaitPeriod;
private volatile Scheduler scheduler;
private final Object schedulerLock = new Object();
private boolean messagePrioritySupported = false;
private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false;
private boolean rmIdFromConnectionId = false;
private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
private RejectedExecutionHandler rejectedTaskHandler = null;
private List<String> trustedPackages = new ArrayList<>();
private boolean trustAllPackages = false;
private int connectResponseTimeout;
/**
* Construct an <code>ActiveMQConnection</code>
*
* @param transport
* @param factoryStats
* @throws Exception
*/
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
this.transport = transport;
this.clientIdGenerator = clientIdGenerator;
this.factoryStats = factoryStats;
// Configure a single threaded executor who's core thread can timeout if
// idle
executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
//Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
//thread.setDaemon(true);
return thread;
}
});
// asyncConnectionThread.allowCoreThreadTimeOut(true);
String uniqueId = connectionIdGenerator.generateId();
this.info = new ConnectionInfo(new ConnectionId(uniqueId));
this.info.setManageable(true);
this.info.setFaultTolerant(transport.isFaultTolerant());
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
this.transport.setTransportListener(this);
this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
}
protected void setUserName(String userName) {
this.info.setUserName(userName);
}
protected void setPassword(String password) {
this.info.setPassword(password);
}
/**
* A static helper method to create a new connection
*
* @return an ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
return (ActiveMQConnection)factory.createConnection();
}
/**
* A static helper method to create a new connection
*
* @param uri
* @return and ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
return (ActiveMQConnection)factory.createConnection();
}
/**
* A static helper method to create a new connection
*
* @param user
* @param password
* @param uri
* @return an ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
return (ActiveMQConnection)factory.createConnection();
}
/**
* @return a number unique for this connection
*/
public JMSConnectionStatsImpl getConnectionStats() {
return stats;
}
/**
* Creates a <CODE>Session</CODE> object.
*
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
* session is transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created session
* @throws JMSException if the <CODE>Connection</CODE> object fails to
* create a session due to some internal error or lack of
* support for the specific transaction and acknowledgement
* mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
* @since 1.1
*/
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
if (!transacted) {
if (acknowledgeMode == Session.SESSION_TRANSACTED) {
throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
} else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
"Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
}
}
return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : acknowledgeMode, isDispatchAsync(), isAlwaysSessionAsync());
}
/**
* @return sessionId
*/
protected SessionId getNextSessionId() {
return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
}
/**
* Gets the client identifier for this connection.
* <P>
* This value is specific to the JMS provider. It is either preconfigured by
* an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
* dynamically by the application by calling the <code>setClientID</code>
* method.
*
* @return the unique client identifier
* @throws JMSException if the JMS provider fails to return the client ID
* for this connection due to some internal error.
*/
@Override
public String getClientID() throws JMSException {
checkClosedOrFailed();
return this.info.getClientId();
}
/**
* Sets the client identifier for this connection.
* <P>
* The preferred way to assign a JMS client's client identifier is for it to
* be configured in a client-specific <CODE>ConnectionFactory</CODE>
* object and transparently assigned to the <CODE>Connection</CODE> object
* it creates.
* <P>
* Alternatively, a client can set a connection's client identifier using a
* provider-specific value. The facility to set a connection's client
* identifier explicitly is not a mechanism for overriding the identifier
* that has been administratively configured. It is provided for the case
* where no administratively specified identifier exists. If one does exist,
* an attempt to change it by setting it must throw an
* <CODE>IllegalStateException</CODE>. If a client sets the client
* identifier explicitly, it must do so immediately after it creates the
* connection and before any other action on the connection is taken. After
* this point, setting the client identifier is a programming error that
* should throw an <CODE>IllegalStateException</CODE>.
* <P>
* The purpose of the client identifier is to associate a connection and its
* objects with a state maintained on behalf of the client by a provider.
* The only such state identified by the JMS API is that required to support
* durable subscriptions.
* <P>
* If another connection with the same <code>clientID</code> is already
* running when this method is called, the JMS provider should detect the
* duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
*
* @param newClientID the unique client identifier
* @throws JMSException if the JMS provider fails to set the client ID for
* this connection due to some internal error.
* @throws javax.jms.InvalidClientIDException if the JMS client specifies an
* invalid or duplicate client ID.
* @throws javax.jms.IllegalStateException if the JMS client attempts to set
* a connection's client ID at the wrong time or when it has
* been administratively configured.
*/
@Override
public void setClientID(String newClientID) throws JMSException {
checkClosedOrFailed();
if (this.clientIDSet) {
throw new IllegalStateException("The clientID has already been set");
}
if (this.isConnectionInfoSentToBroker) {
throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
}
this.info.setClientId(newClientID);
this.userSpecifiedClientID = true;
ensureConnectionInfoSent();
}
/**
* Sets the default client id that the connection will use if explicitly not
* set with the setClientId() call.
*/
public void setDefaultClientID(String clientID) throws JMSException {
this.info.setClientId(clientID);
this.userSpecifiedClientID = true;
}
/**
* Gets the metadata for this connection.
*
* @return the connection metadata
* @throws JMSException if the JMS provider fails to get the connection
* metadata for this connection.
* @see javax.jms.ConnectionMetaData
*/
@Override
public ConnectionMetaData getMetaData() throws JMSException {
checkClosedOrFailed();
return ActiveMQConnectionMetaData.INSTANCE;
}
/**
* Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
* every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
* associated with it.
*
* @return the <CODE>ExceptionListener</CODE> for this connection, or
* null, if no <CODE>ExceptionListener</CODE> is associated with
* this connection.
* @throws JMSException if the JMS provider fails to get the
* <CODE>ExceptionListener</CODE> for this connection.
* @see javax.jms.Connection#setExceptionListener(ExceptionListener)
*/
@Override
public ExceptionListener getExceptionListener() throws JMSException {
checkClosedOrFailed();
return this.exceptionListener;
}
/**
* Sets an exception listener for this connection.
* <P>
* If a JMS provider detects a serious problem with a connection, it informs
* the connection's <CODE> ExceptionListener</CODE>, if one has been
* registered. It does this by calling the listener's <CODE>onException
* </CODE>
* method, passing it a <CODE>JMSException</CODE> object describing the
* problem.
* <P>
* An exception listener allows a client to be notified of a problem
* asynchronously. Some connections only consume messages, so they would
* have no other way to learn their connection has failed.
* <P>
* A connection serializes execution of its <CODE>ExceptionListener</CODE>.
* <P>
* A JMS provider should attempt to resolve connection problems itself
* before it notifies the client of them.
*
* @param listener the exception listener
* @throws JMSException if the JMS provider fails to set the exception
* listener for this connection.
*/
@Override
public void setExceptionListener(ExceptionListener listener) throws JMSException {
checkClosedOrFailed();
this.exceptionListener = listener;
}
/**
* Gets the <code>ClientInternalExceptionListener</code> object for this connection.
* Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
* associated with it.
*
* @return the listener or <code>null</code> if no listener is registered with the connection.
*/
public ClientInternalExceptionListener getClientInternalExceptionListener() {
return clientInternalExceptionListener;
}
/**
* Sets a client internal exception listener for this connection.
* The connection will notify the listener, if one has been registered, of exceptions thrown by container components
* (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
* It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
* describing the problem.
*
* @param listener the exception listener
*/
public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
this.clientInternalExceptionListener = listener;
}
/**
* Starts (or restarts) a connection's delivery of incoming messages. A call
* to <CODE>start</CODE> on a connection that has already been started is
* ignored.
*
* @throws JMSException if the JMS provider fails to start message delivery
* due to some internal error.
* @see javax.jms.Connection#stop()
*/
@Override
public void start() throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
if (started.compareAndSet(false, true)) {
for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
ActiveMQSession session = i.next();
session.start();
}
}
}
/**
* Temporarily stops a connection's delivery of incoming messages. Delivery
* can be restarted using the connection's <CODE>start</CODE> method. When
* the connection is stopped, delivery to all the connection's message
* consumers is inhibited: synchronous receives block, and messages are not
* delivered to message listeners.
* <P>
* This call blocks until receives and/or message listeners in progress have
* completed.
* <P>
* Stopping a connection has no effect on its ability to send messages. A
* call to <CODE>stop</CODE> on a connection that has already been stopped
* is ignored.
* <P>
* A call to <CODE>stop</CODE> must not return until delivery of messages
* has paused. This means that a client can rely on the fact that none of
* its message listeners will be called and that all threads of control
* waiting for <CODE>receive</CODE> calls to return will not return with a
* message until the connection is restarted. The receive timers for a
* stopped connection continue to advance, so receives may time out while
* the connection is stopped.
* <P>
* If message listeners are running when <CODE>stop</CODE> is invoked, the
* <CODE>stop</CODE> call must wait until all of them have returned before
* it may return. While these message listeners are completing, they must
* have the full services of the connection available to them.
*
* @throws JMSException if the JMS provider fails to stop message delivery
* due to some internal error.
* @see javax.jms.Connection#start()
*/
@Override
public void stop() throws JMSException {
doStop(true);
}
/**
* @see #stop()
* @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed,
* <tt>false</tt> to skip this check
* @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
*/
void doStop(boolean checkClosed) throws JMSException {
if (checkClosed) {
checkClosedOrFailed();
}
if (started.compareAndSet(true, false)) {
synchronized(sessions) {
for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.stop();
}
}
}
}
/**
* Closes the connection.
* <P>
* Since a provider typically allocates significant resources outside the
* JVM on behalf of a connection, clients should close these resources when
* they are not needed. Relying on garbage collection to eventually reclaim
* these resources may not be timely enough.
* <P>
* There is no need to close the sessions, producers, and consumers of a
* closed connection.
* <P>
* Closing a connection causes all temporary destinations to be deleted.
* <P>
* When this method is invoked, it should not return until message
* processing has been shut down in an orderly fashion. This means that all
* message listeners that may have been running have returned, and that all
* pending receives have returned. A close terminates all pending message
* receives on the connection's sessions' consumers. The receives may return
* with a message or with null, depending on whether there was a message
* available at the time of the close. If one or more of the connection's
* sessions' message listeners is processing a message at the time when
* connection <CODE>close</CODE> is invoked, all the facilities of the
* connection and its sessions must remain available to those listeners
* until they return control to the JMS provider.
* <P>
* Closing a connection causes any of its sessions' transactions in progress
* to be rolled back. In the case where a session's work is coordinated by
* an external transaction manager, a session's <CODE>commit</CODE> and
* <CODE> rollback</CODE> methods are not used and the result of a closed
* session's work is determined later by the transaction manager. Closing a
* connection does NOT force an acknowledgment of client-acknowledged
* sessions.
* <P>
* Invoking the <CODE>acknowledge</CODE> method of a received message from
* a closed connection's session must throw an
* <CODE>IllegalStateException</CODE>. Closing a closed connection must
* NOT throw an exception.
*
* @throws JMSException if the JMS provider fails to close the connection
* due to some internal error. For example, a failure to
* release resources or to close a socket connection can
* cause this exception to be thrown.
*/
@Override
public void close() throws JMSException {
try {
// If we were running, lets stop first.
if (!closed.get() && !transportFailed.get()) {
// do not fail if already closed as according to JMS spec we must not
// throw exception if already closed
doStop(false);
}
synchronized (this) {
if (!closed.get()) {
closing.set(true);
if (destinationSource != null) {
destinationSource.stop();
destinationSource = null;
}
if (advisoryConsumer != null) {
advisoryConsumer.dispose();
advisoryConsumer = null;
}
Scheduler scheduler = this.scheduler;
if (scheduler != null) {
try {
scheduler.stop();
} catch (Exception e) {
JMSException ex = JMSExceptionSupport.create(e);
throw ex;
}
}
long lastDeliveredSequenceId = -1;
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.dispose();
lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
}
for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = i.next();
c.dispose();
}
this.activeTempDestinations.clear();
try {
if (isConnectionInfoSentToBroker) {
// If we announced ourselves to the broker.. Try to let the broker
// know that the connection is being shutdown.
RemoveInfo removeCommand = info.createRemoveCommand();
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
try {
syncSendPacket(removeCommand, closeTimeout);
} catch (JMSException e) {
if (e.getCause() instanceof RequestTimedOutIOException) {
// expected
} else {
throw e;
}
}
doAsyncSendPacket(new ShutdownInfo());
}
} finally { // release anyway even if previous communication fails
started.set(false);
// TODO if we move the TaskRunnerFactory to the connection
// factory
// then we may need to call
// factory.onConnectionClose(this);
if (sessionTaskRunner != null) {
sessionTaskRunner.shutdown();
}
closed.set(true);
closing.set(false);
}
}
}
} finally {
try {
if (executor != null) {
ThreadPoolUtils.shutdown(executor);
}
} catch (Throwable e) {
LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
}
ServiceSupport.dispose(this.transport);
factoryStats.removeConnection(this);
}
}
/**
* Tells the broker to terminate its VM. This can be used to cleanly
* terminate a broker running in a standalone java process. Server must have
* property enable.vm.shutdown=true defined to allow this to work.
*/
// TODO : org.apache.activemq.message.BrokerAdminCommand not yet
// implemented.
/*
* public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
* command = new BrokerAdminCommand();
* command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
* asyncSendPacket(command); }
*/
/**
* Create a durable connection consumer for this connection (optional
* operation). This is an expert facility not used by regular JMS clients.
*
* @param topic topic to access
* @param subscriptionName durable subscription name
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
* empty string indicates that there is no message selector
* for the message consumer.
* @param sessionPool the server session pool to associate with this durable
* connection consumer
* @param maxMessages the maximum number of messages that can be assigned to
* a server session at one time
* @return the durable connection consumer
* @throws JMSException if the <CODE>Connection</CODE> object fails to
* create a connection consumer due to some internal error
* or invalid arguments for <CODE>sessionPool</CODE> and
* <CODE>messageSelector</CODE>.
* @throws javax.jms.InvalidDestinationException if an invalid destination
* is specified.
* @throws javax.jms.InvalidSelectorException if the message selector is
* invalid.
* @see javax.jms.ConnectionConsumer
* @since 1.1
*/
@Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
throws JMSException {
return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
}
/**
* Create a durable connection consumer for this connection (optional
* operation). This is an expert facility not used by regular JMS clients.
*
* @param topic topic to access
* @param subscriptionName durable subscription name
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
* empty string indicates that there is no message selector
* for the message consumer.
* @param sessionPool the server session pool to associate with this durable
* connection consumer
* @param maxMessages the maximum number of messages that can be assigned to
* a server session at one time
* @param noLocal set true if you want to filter out messages published
* locally
* @return the durable connection consumer
* @throws JMSException if the <CODE>Connection</CODE> object fails to
* create a connection consumer due to some internal error
* or invalid arguments for <CODE>sessionPool</CODE> and
* <CODE>messageSelector</CODE>.
* @throws javax.jms.InvalidDestinationException if an invalid destination
* is specified.
* @throws javax.jms.InvalidSelectorException if the message selector is
* invalid.
* @see javax.jms.ConnectionConsumer
* @since 1.1
*/
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
boolean noLocal) throws JMSException {
checkClosedOrFailed();
if (queueOnlyConnection) {
throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
}
ensureConnectionInfoSent();
SessionId sessionId = new SessionId(info.getConnectionId(), -1);
ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
info.setSubscriptionName(subscriptionName);
info.setSelector(messageSelector);
info.setPrefetchSize(maxMessages);
info.setDispatchAsync(isDispatchAsync());
// Allows the options on the destination to configure the consumerInfo
if (info.getDestination().getOptions() != null) {
Map<String, String> options = new HashMap<>(info.getDestination().getOptions());
IntrospectionSupport.setProperties(this.info, options, "consumer.");
}
return new ActiveMQConnectionConsumer(this, sessionPool, info);
}
// Properties
// -------------------------------------------------------------------------
/**
* Returns true if this connection has been started
*
* @return true if this Connection is started
*/
public boolean isStarted() {
return started.get();
}
/**
* Returns true if the connection is closed
*/
public boolean isClosed() {
return closed.get();
}
/**
* Returns true if the connection is in the process of being closed
*/
public boolean isClosing() {
return closing.get();
}
/**
* Returns true if the underlying transport has failed
*/
public boolean isTransportFailed() {
return transportFailed.get();
}
/**
* @return Returns the prefetchPolicy.
*/
public ActiveMQPrefetchPolicy getPrefetchPolicy() {
return prefetchPolicy;
}
/**
* Sets the <a
* href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
* policy</a> for consumers created by this connection.
*/
public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
this.prefetchPolicy = prefetchPolicy;
}
/**
*/
public Transport getTransportChannel() {
return transport;
}
/**
* @return Returns the clientID of the connection, forcing one to be
* generated if one has not yet been configured.
*/
public String getInitializedClientID() throws JMSException {
ensureConnectionInfoSent();
return info.getClientId();
}
/**
* @return Returns the timeStampsDisableByDefault.
*/
public boolean isDisableTimeStampsByDefault() {
return disableTimeStampsByDefault;
}
/**
* Sets whether or not timestamps on messages should be disabled or not. If
* you disable them it adds a small performance boost.
*/
public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
this.disableTimeStampsByDefault = timeStampsDisableByDefault;
}
/**
* @return Returns the dispatchOptimizedMessage.
*/
public boolean isOptimizedMessageDispatch() {
return optimizedMessageDispatch;
}
/**
* If this flag is set then an larger prefetch limit is used - only
* applicable for durable topic subscribers.
*/
public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
this.optimizedMessageDispatch = dispatchOptimizedMessage;
}
/**
* @return Returns the closeTimeout.
*/
public int getCloseTimeout() {
return closeTimeout;
}
/**
* Sets the timeout before a close is considered complete. Normally a
* close() on a connection waits for confirmation from the broker; this
* allows that operation to timeout to save the client hanging if there is
* no broker
*/
public void setCloseTimeout(int closeTimeout) {
this.closeTimeout = closeTimeout;
}
/**
* @return ConnectionInfo
*/
public ConnectionInfo getConnectionInfo() {
return this.info;
}
public boolean isUseRetroactiveConsumer() {
return useRetroactiveConsumer;
}
/**
* Sets whether or not retroactive consumers are enabled. Retroactive
* consumers allow non-durable topic subscribers to receive old messages
* that were published before the non-durable subscriber started.
*/
public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
this.useRetroactiveConsumer = useRetroactiveConsumer;
}
public boolean isNestedMapAndListEnabled() {
return nestedMapAndListEnabled;
}
/**
* Enables/disables whether or not Message properties and MapMessage entries
* support <a
* href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
* Structures</a> of Map and List objects
*/
public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
this.nestedMapAndListEnabled = structuredMapsEnabled;
}
public boolean isExclusiveConsumer() {
return exclusiveConsumer;
}
/**
* Enables or disables whether or not queue consumers should be exclusive or
* not for example to preserve ordering when not using <a
* href="http://activemq.apache.org/message-groups.html">Message Groups</a>
*
* @param exclusiveConsumer
*/
public void setExclusiveConsumer(boolean exclusiveConsumer) {
this.exclusiveConsumer = exclusiveConsumer;
}
/**
* Adds a transport listener so that a client can be notified of events in
* the underlying transport
*/
public void addTransportListener(TransportListener transportListener) {
transportListeners.add(transportListener);
}
public void removeTransportListener(TransportListener transportListener) {
transportListeners.remove(transportListener);
}
public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner;
}
public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
public TaskRunnerFactory getSessionTaskRunner() {
synchronized (this) {
if (sessionTaskRunner == null) {
sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
}
}
return sessionTaskRunner;
}
public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
this.sessionTaskRunner = sessionTaskRunner;
}
public MessageTransformer getTransformer() {
return transformer;
}
/**
* Sets the transformer used to transform messages before they are sent on
* to the JMS bus or when they are received from the bus but before they are
* delivered to the JMS client
*/
public void setTransformer(MessageTransformer transformer) {
this.transformer = transformer;
}
/**
* @return the statsEnabled
*/
public boolean isStatsEnabled() {
return this.stats.isEnabled();
}
/**
* @param statsEnabled the statsEnabled to set
*/
public void setStatsEnabled(boolean statsEnabled) {
this.stats.setEnabled(statsEnabled);
}
/**
* Returns the {@link DestinationSource} object which can be used to listen to destinations
* being created or destroyed or to enquire about the current destinations available on the broker
*
* @return a lazily created destination source
* @throws JMSException
*/
@Override
public DestinationSource getDestinationSource() throws JMSException {
if (destinationSource == null) {
destinationSource = new DestinationSource(this);
destinationSource.start();
}
return destinationSource;
}
// Implementation methods
// -------------------------------------------------------------------------
/**
* Used internally for adding Sessions to the Connection
*
* @param session
* @throws JMSException
* @throws JMSException
*/
protected void addSession(ActiveMQSession session) throws JMSException {
this.sessions.add(session);
if (sessions.size() > 1 || session.isTransacted()) {
optimizedMessageDispatch = false;
}
}
/**
* Used interanlly for removing Sessions from a Connection
*
* @param session
*/
protected void removeSession(ActiveMQSession session) {
this.sessions.remove(session);
this.removeDispatcher(session);
}
/**
* Add a ConnectionConsumer
*
* @param connectionConsumer
* @throws JMSException
*/
protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
this.connectionConsumers.add(connectionConsumer);
}
/**
* Remove a ConnectionConsumer
*
* @param connectionConsumer
*/
protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
this.connectionConsumers.remove(connectionConsumer);
this.removeDispatcher(connectionConsumer);
}
/**
* Creates a <CODE>TopicSession</CODE> object.
*
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
* session is transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created topic session
* @throws JMSException if the <CODE>TopicConnection</CODE> object fails
* to create a session due to some internal error or lack of
* support for the specific transaction and acknowledgement
* mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
*/
@Override
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
}
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
*
* @param topic the topic to access
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
* empty string indicates that there is no message selector
* for the message consumer.
* @param sessionPool the server session pool to associate with this
* connection consumer
* @param maxMessages the maximum number of messages that can be assigned to
* a server session at one time
* @return the connection consumer
* @throws JMSException if the <CODE>TopicConnection</CODE> object fails
* to create a connection consumer due to some internal
* error or invalid arguments for <CODE>sessionPool</CODE>
* and <CODE>messageSelector</CODE>.
* @throws javax.jms.InvalidDestinationException if an invalid topic is
* specified.
* @throws javax.jms.InvalidSelectorException if the message selector is
* invalid.
* @see javax.jms.ConnectionConsumer
*/
@Override
public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
}
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
*
* @param queue the queue to access
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
* empty string indicates that there is no message selector
* for the message consumer.
* @param sessionPool the server session pool to associate with this
* connection consumer
* @param maxMessages the maximum number of messages that can be assigned to
* a server session at one time
* @return the connection consumer
* @throws JMSException if the <CODE>QueueConnection</CODE> object fails
* to create a connection consumer due to some internal
* error or invalid arguments for <CODE>sessionPool</CODE>
* and <CODE>messageSelector</CODE>.
* @throws javax.jms.InvalidDestinationException if an invalid queue is
* specified.
* @throws javax.jms.InvalidSelectorException if the message selector is
* invalid.
* @see javax.jms.ConnectionConsumer
*/
@Override
public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
}
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
*
* @param destination the destination to access
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
* empty string indicates that there is no message selector
* for the message consumer.
* @param sessionPool the server session pool to associate with this
* connection consumer
* @param maxMessages the maximum number of messages that can be assigned to
* a server session at one time
* @return the connection consumer
* @throws JMSException if the <CODE>Connection</CODE> object fails to
* create a connection consumer due to some internal error
* or invalid arguments for <CODE>sessionPool</CODE> and
* <CODE>messageSelector</CODE>.
* @throws javax.jms.InvalidDestinationException if an invalid destination
* is specified.
* @throws javax.jms.InvalidSelectorException if the message selector is
* invalid.
* @see javax.jms.ConnectionConsumer
* @since 1.1
*/
@Override
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
}
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
ConsumerId consumerId = createConsumerId();
ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
consumerInfo.setSelector(messageSelector);
consumerInfo.setPrefetchSize(maxMessages);
consumerInfo.setNoLocal(noLocal);
consumerInfo.setDispatchAsync(isDispatchAsync());
// Allows the options on the destination to configure the consumerInfo
if (consumerInfo.getDestination().getOptions() != null) {
Map<String, String> options = new HashMap<>(consumerInfo.getDestination().getOptions());
IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
}
return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
}
/**
* @return a newly created ConsumedId unique to this connection session instance.
*/
private ConsumerId createConsumerId() {
return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
}
/**
* Creates a <CODE>QueueSession</CODE> object.
*
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
* session is transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created queue session
* @throws JMSException if the <CODE>QueueConnection</CODE> object fails
* to create a session due to some internal error or lack of
* support for the specific transaction and acknowledgement
* mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
*/
@Override
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
}
/**
* Ensures that the clientID was manually specified and not auto-generated.
* If the clientID was not specified this method will throw an exception.
* This method is used to ensure that the clientID + durableSubscriber name
* are used correctly.
*
* @throws JMSException
*/
public void checkClientIDWasManuallySpecified() throws JMSException {
if (!userSpecifiedClientID) {
throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
}
}
/**
* send a Packet through the Connection - for internal use only
*
* @param command
* @throws JMSException
*/
public void asyncSendPacket(Command command) throws JMSException {
if (isClosed()) {
throw new ConnectionClosedException();
} else {
doAsyncSendPacket(command);
}
}
private void doAsyncSendPacket(Command command) throws JMSException {
try {
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
/**
* Send a packet through a Connection - for internal use only
*
* @param command
*
* @throws JMSException
*/
public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
if(onComplete==null) {
syncSendPacket(command);
} else {
if (isClosed()) {
throw new ConnectionClosedException();
}
try {
this.transport.asyncRequest(command, new ResponseCallback() {
@Override
public void onCompletion(FutureResponse resp) {
Response response;
Throwable exception = null;
try {
response = resp.getResult();
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
exception = er.getException();
}
} catch (Exception e) {
exception = e;
}
if (exception != null) {
if ( exception instanceof JMSException) {
onComplete.onException((JMSException) exception);
} else {
if (isClosed() || closing.get()) {
LOG.debug("Received an exception but connection is closing");
}
JMSException jmsEx = null;
try {
jmsEx = JMSExceptionSupport.create(exception);
} catch(Throwable e) {
LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
}
// dispose of transport for security exceptions on connection initiation
if (exception instanceof SecurityException && command instanceof ConnectionInfo){
try {
forceCloseOnSecurityException(exception);
} catch (Throwable t) {
// We throw the original error from the ExceptionResponse instead.
}
}
if (jmsEx != null) {
onComplete.onException(jmsEx);
}
}
} else {
onComplete.onSuccess();
}
}
});
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
}
private void forceCloseOnSecurityException(Throwable exception) {
LOG.trace("force close on security exception:{}, transport={}", this, transport, exception);
onException(new IOException("Force close due to SecurityException on connect", exception));
}
public Response syncSendPacket(Command command, int timeout) throws JMSException {
if (isClosed()) {
throw new ConnectionClosedException();
} else {
try {
Response response = (Response)(timeout > 0
? this.transport.request(command, timeout)
: this.transport.request(command));
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException();
} else {
if (isClosed() || closing.get()) {
LOG.debug("Received an exception but connection is closing");
}
JMSException jmsEx = null;
try {
jmsEx = JMSExceptionSupport.create(er.getException());
} catch(Throwable e) {
LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
}
if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
try {
forceCloseOnSecurityException(er.getException());
} catch (Throwable t) {
// We throw the original error from the ExceptionResponse instead.
}
}
if (jmsEx != null) {
throw jmsEx;
}
}
}
return response;
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
}
/**
* Send a packet through a Connection - for internal use only
*
* @param command
*
* @return the broker Response for the given Command.
*
* @throws JMSException
*/
public Response syncSendPacket(Command command) throws JMSException {
return syncSendPacket(command, 0);
}
/**
* @return statistics for this Connection
*/
@Override
public StatsImpl getStats() {
return stats;
}
/**
* simply throws an exception if the Connection is already closed or the
* Transport has failed
*
* @throws JMSException
*/
protected synchronized void checkClosedOrFailed() throws JMSException {
checkClosed();
if (transportFailed.get()) {
throw new ConnectionFailedException(firstFailureError);
}
}
/**
* simply throws an exception if the Connection is already closed
*
* @throws JMSException
*/
protected synchronized void checkClosed() throws JMSException {
if (closed.get()) {
throw new ConnectionClosedException();
}
}
/**
* Send the ConnectionInfo to the Broker
*
* @throws JMSException
*/
protected void ensureConnectionInfoSent() throws JMSException {
synchronized(this.ensureConnectionInfoSentMutex) {
// Can we skip sending the ConnectionInfo packet??
if (isConnectionInfoSentToBroker || closed.get()) {
return;
}
//TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
info.setClientId(clientIdGenerator.generateId());
}
syncSendPacket(info.copy(), getConnectResponseTimeout());
this.isConnectionInfoSentToBroker = true;
// Add a temp destination advisory consumer so that
// We know what the valid temporary destinations are on the
// broker without having to do an RPC to the broker.
ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
if (watchTopicAdvisories) {
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
}
}
}
public synchronized boolean isWatchTopicAdvisories() {
return watchTopicAdvisories;
}
public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
this.watchTopicAdvisories = watchTopicAdvisories;
}
/**
* @return Returns the useAsyncSend.
*/
public boolean isUseAsyncSend() {
return useAsyncSend;
}
/**
* Forces the use of <a
* href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
* adds a massive performance boost; but means that the send() method will
* return immediately whether the message has been sent or not which could
* lead to message loss.
*/
public void setUseAsyncSend(boolean useAsyncSend) {
this.useAsyncSend = useAsyncSend;
}
/**
* @return true if always sync send messages
*/
public boolean isAlwaysSyncSend() {
return this.alwaysSyncSend;
}
/**
* Set true if always require messages to be sync sent
*
* @param alwaysSyncSend
*/
public void setAlwaysSyncSend(boolean alwaysSyncSend) {
this.alwaysSyncSend = alwaysSyncSend;
}
/**
* @return the messagePrioritySupported
*/
public boolean isMessagePrioritySupported() {
return this.messagePrioritySupported;
}
/**
* @param messagePrioritySupported the messagePrioritySupported to set
*/
public void setMessagePrioritySupported(boolean messagePrioritySupported) {
this.messagePrioritySupported = messagePrioritySupported;
}
/**
* Cleans up this connection so that it's state is as if the connection was
* just created. This allows the Resource Adapter to clean up a connection
* so that it can be reused without having to close and recreate the
* connection.
*/
public void cleanup() throws JMSException {
doCleanup(false);
}
public boolean isUserSpecifiedClientID() {
return userSpecifiedClientID;
}
public void doCleanup(boolean removeConnection) throws JMSException {
if (advisoryConsumer != null && !isTransportFailed()) {
advisoryConsumer.dispose();
advisoryConsumer = null;
}
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.dispose();
}
for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = i.next();
c.dispose();
}
if (removeConnection) {
if (isConnectionInfoSentToBroker) {
if (!transportFailed.get() && !closing.get()) {
syncSendPacket(info.createRemoveCommand());
}
isConnectionInfoSentToBroker = false;
}
if (userSpecifiedClientID) {
info.setClientId(null);
userSpecifiedClientID = false;
}
clientIDSet = false;
}
started.set(false);
}
/**
* Changes the associated username/password that is associated with this
* connection. If the connection has been used, you must called cleanup()
* before calling this method.
*
* @throws IllegalStateException if the connection is in used.
*/
public void changeUserInfo(String userName, String password) throws JMSException {
if (isConnectionInfoSentToBroker) {
throw new IllegalStateException("changeUserInfo used Connection is not allowed");
}
this.info.setUserName(userName);
this.info.setPassword(password);
}
/**
* @return Returns the resourceManagerId.
* @throws JMSException
*/
public String getResourceManagerId() throws JMSException {
if (isRmIdFromConnectionId()) {
return info.getConnectionId().getValue();
}
waitForBrokerInfo();
if (brokerInfo == null) {
throw new JMSException("Connection failed before Broker info was received.");
}
return brokerInfo.getBrokerId().getValue();
}
/**
* Returns the broker name if one is available or null if one is not
* available yet.
*/
public String getBrokerName() {
try {
brokerInfoReceived.await(5, TimeUnit.SECONDS);
if (brokerInfo == null) {
return null;
}
return brokerInfo.getBrokerName();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
/**
* Returns the broker information if it is available or null if it is not
* available yet.
*/
public BrokerInfo getBrokerInfo() {
return brokerInfo;
}
/**
* @return Returns the RedeliveryPolicy.
* @throws JMSException
*/
public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
return redeliveryPolicyMap.getDefaultEntry();
}
/**
* Sets the redelivery policy to be used when messages are rolled back
*/
public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
}
public BlobTransferPolicy getBlobTransferPolicy() {
if (blobTransferPolicy == null) {
blobTransferPolicy = createBlobTransferPolicy();
}
return blobTransferPolicy;
}
/**
* Sets the policy used to describe how out-of-band BLOBs (Binary Large
* OBjects) are transferred from producers to brokers to consumers
*/
public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
this.blobTransferPolicy = blobTransferPolicy;
}
/**
* @return Returns the alwaysSessionAsync.
*/
public boolean isAlwaysSessionAsync() {
return alwaysSessionAsync;
}
/**
* If this flag is not set then a separate thread is not used for dispatching messages for each Session in
* the Connection. However, a separate thread is always used if there is more than one session, or the session
* isn't in auto acknowledge or duplicates ok mode. By default this value is set to true and session dispatch
* happens asynchronously.
*/
public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
this.alwaysSessionAsync = alwaysSessionAsync;
}
/**
* @return Returns the optimizeAcknowledge.
*/
public boolean isOptimizeAcknowledge() {
return optimizeAcknowledge;
}
/**
* Enables an optimised acknowledgement mode where messages are acknowledged
* in batches rather than individually
*
* @param optimizeAcknowledge The optimizeAcknowledge to set.
*/
public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
this.optimizeAcknowledge = optimizeAcknowledge;
}
/**
* The max time in milliseconds between optimized ack batches
* @param optimizeAcknowledgeTimeOut
*/
public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut;
}
public long getOptimizeAcknowledgeTimeOut() {
return optimizeAcknowledgeTimeOut;
}
public long getWarnAboutUnstartedConnectionTimeout() {
return warnAboutUnstartedConnectionTimeout;
}
/**
* Enables the timeout from a connection creation to when a warning is
* generated if the connection is not properly started via {@link #start()}
* and a message is received by a consumer. It is a very common gotcha to
* forget to <a
* href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
* the connection</a> so this option makes the default case to create a
* warning if the user forgets. To disable the warning just set the value to <
* 0 (say -1).
*/
public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
}
/**
* @return the sendTimeout (in milliseconds)
*/
public int getSendTimeout() {
return sendTimeout;
}
/**
* @param sendTimeout the sendTimeout to set (in milliseconds)
*/
public void setSendTimeout(int sendTimeout) {
this.sendTimeout = sendTimeout;
}
/**
* @return the sendAcksAsync
*/
public boolean isSendAcksAsync() {
return sendAcksAsync;
}
/**
* @param sendAcksAsync the sendAcksAsync to set
*/
public void setSendAcksAsync(boolean sendAcksAsync) {
this.sendAcksAsync = sendAcksAsync;
}
/**
* Returns the time this connection was created
*/
public long getTimeCreated() {
return timeCreated;
}
private void waitForBrokerInfo() throws JMSException {
try {
brokerInfoReceived.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw JMSExceptionSupport.create(e);
}
}
// Package protected so that it can be used in unit tests
public Transport getTransport() {
return transport;
}
public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
producers.put(producerId, producer);
}
public void removeProducer(ProducerId producerId) {
producers.remove(producerId);
}
public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
dispatchers.put(consumerId, dispatcher);
}
public void removeDispatcher(ConsumerId consumerId) {
dispatchers.remove(consumerId);
}
public boolean hasDispatcher(ConsumerId consumerId) {
return dispatchers.containsKey(consumerId);
}
/**
* @param o - the command to consume
*/
@Override
public void onCommand(final Object o) {
final Command command = (Command)o;
if (!closed.get() && command != null) {
try {
command.visit(new CommandVisitorAdapter() {
@Override
public Response processMessageDispatch(MessageDispatch md) throws Exception {
waitForTransportInterruptionProcessingToComplete();
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching via
// vm://
// md.getMessage() == null to signal end of queue
// browse.
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage(null);
md.setMessage(msg);
}
dispatcher.dispatch(md);
} else {
LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
}
return null;
}
@Override
public Response processProducerAck(ProducerAck pa) throws Exception {
if (pa != null && pa.getProducerId() != null) {
ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
if (producer != null) {
producer.onProducerAck(pa);
}
}
return null;
}
@Override
public Response processBrokerInfo(BrokerInfo info) throws Exception {
brokerInfo = info;
brokerInfoReceived.countDown();
optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
return null;
}
@Override
public Response processConnectionError(final ConnectionError error) throws Exception {
executor.execute(new Runnable() {
@Override
public void run() {
onAsyncException(error.getException());
}
});
return null;
}
@Override
public Response processControlCommand(ControlCommand command) throws Exception {
return null;
}
@Override
public Response processConnectionControl(ConnectionControl control) throws Exception {
onConnectionControl((ConnectionControl)command);
return null;
}
@Override
public Response processConsumerControl(ConsumerControl control) throws Exception {
onConsumerControl((ConsumerControl)command);
return null;
}
@Override
public Response processWireFormat(WireFormatInfo info) throws Exception {
onWireFormatInfo((WireFormatInfo)command);
return null;
}
});
} catch (Exception e) {
onClientInternalException(e);
}
}
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onCommand(command);
}
}
protected void onWireFormatInfo(WireFormatInfo info) {
protocolVersion.set(info.getVersion());
}
/**
* Handles async client internal exceptions.
* A client internal exception is usually one that has been thrown
* by a container runtime component during asynchronous processing of a
* message that does not affect the connection itself.
* This method notifies the <code>ClientInternalExceptionListener</code> by invoking
* its <code>onException</code> method, if one has been registered with this connection.
*
* @param error the exception that the problem
*/
public void onClientInternalException(final Throwable error) {
if ( !closed.get() && !closing.get() ) {
if ( this.clientInternalExceptionListener != null ) {
executor.execute(new Runnable() {
@Override
public void run() {
ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
}
});
} else {
LOG.debug("Async client internal exception occurred with no exception listener registered: {}",
error, error);
}
}
}
/**
* Used for handling async exceptions
*
* @param error
*/
public void onAsyncException(Throwable error) {
if (!closed.get() && !closing.get()) {
if (this.exceptionListener != null) {
if (!(error instanceof JMSException)) {
error = JMSExceptionSupport.create(error);
}
final JMSException e = (JMSException)error;
executor.execute(new Runnable() {
@Override
public void run() {
ActiveMQConnection.this.exceptionListener.onException(e);
}
});
} else {
LOG.debug("Async exception with no exception listener: {}", error, error);
}
}
}
@Override
public void onException(final IOException error) {
onAsyncException(error);
if (!closed.get() && !closing.get()) {
executor.execute(new Runnable() {
@Override
public void run() {
transportFailed(error);
ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown();
try {
doCleanup(true);
} catch (JMSException e) {
LOG.warn("Exception during connection cleanup, " + e, e);
}
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onException(error);
}
}
});
}
}
@Override
public void transportInterupted() {
transportInterruptionProcessingComplete.set(1);
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.clearMessagesInProgress(transportInterruptionProcessingComplete);
}
for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete);
}
if (transportInterruptionProcessingComplete.decrementAndGet() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get());
}
signalInterruptionProcessingNeeded();
}
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.transportInterupted();
}
}
@Override
public void transportResumed() {
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.transportResumed();
}
}
/**
* Create the DestinationInfo object for the temporary destination.
*
* @param topic - if its true topic, else queue.
* @return DestinationInfo
* @throws JMSException
*/
protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
// Check if Destination info is of temporary type.
ActiveMQTempDestination dest;
if (topic) {
dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
} else {
dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
}
DestinationInfo info = new DestinationInfo();
info.setConnectionId(this.info.getConnectionId());
info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
info.setDestination(dest);
syncSendPacket(info);
dest.setConnection(this);
activeTempDestinations.put(dest, dest);
return dest;
}
/**
* @param destination
* @throws JMSException
*/
public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
checkClosedOrFailed();
for (ActiveMQSession session : this.sessions) {
if (session.isInUse(destination)) {
throw new JMSException("A consumer is consuming from the temporary destination");
}
}
activeTempDestinations.remove(destination);
DestinationInfo destInfo = new DestinationInfo();
destInfo.setConnectionId(this.info.getConnectionId());
destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
destInfo.setDestination(destination);
destInfo.setTimeout(0);
syncSendPacket(destInfo);
}
public boolean isDeleted(ActiveMQDestination dest) {
// If we are not watching the advisories.. then
// we will assume that the temp destination does exist.
if (advisoryConsumer == null) {
return false;
}
return !activeTempDestinations.containsValue(dest);
}
public boolean isCopyMessageOnSend() {
return copyMessageOnSend;
}
public LongSequenceGenerator getLocalTransactionIdGenerator() {
return localTransactionIdGenerator;
}
public boolean isUseCompression() {
return useCompression;
}
/**
* Enables the use of compression of the message bodies
*/
public void setUseCompression(boolean useCompression) {
this.useCompression = useCompression;
}
public void destroyDestination(ActiveMQDestination destination) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
DestinationInfo info = new DestinationInfo();
info.setConnectionId(this.info.getConnectionId());
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
info.setDestination(destination);
info.setTimeout(0);
syncSendPacket(info);
}
public boolean isDispatchAsync() {
return dispatchAsync;
}
/**
* Enables or disables the default setting of whether or not consumers have
* their messages <a
* href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
* synchronously or asynchronously by the broker</a>. For non-durable
* topics for example we typically dispatch synchronously by default to
* minimize context switches which boost performance. However sometimes its
* better to go slower to ensure that a single blocked consumer socket does
* not block delivery to other consumers.
*
* @param asyncDispatch If true then consumers created on this connection
* will default to having their messages dispatched
* asynchronously. The default value is true.
*/
public void setDispatchAsync(boolean asyncDispatch) {
this.dispatchAsync = asyncDispatch;
}
public boolean isObjectMessageSerializationDefered() {
return objectMessageSerializationDefered;
}
/**
* When an object is set on an ObjectMessage, the JMS spec requires the
* object to be serialized by that set method. Enabling this flag causes the
* object to not get serialized. The object may subsequently get serialized
* if the message needs to be sent over a socket or stored to disk.
*/
public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
this.objectMessageSerializationDefered = objectMessageSerializationDefered;
}
/**
* Unsubscribes a durable subscription that has been created by a client.
* <P>
* This method deletes the state being maintained on behalf of the
* subscriber by its provider.
* <P>
* It is erroneous for a client to delete a durable subscription while there
* is an active <CODE>MessageConsumer </CODE> or
* <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
* message is part of a pending transaction or has not been acknowledged in
* the session.
*
* @param name the name used to identify this subscription
* @throws JMSException if the session fails to unsubscribe to the durable
* subscription due to some internal error.
* @throws InvalidDestinationException if an invalid subscription name is
* specified.
* @since 1.1
*/
public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
checkClosedOrFailed();
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(getConnectionInfo().getConnectionId());
rsi.setSubscriptionName(name);
rsi.setClientId(getConnectionInfo().getClientId());
syncSendPacket(rsi);
}
/**
* Internal send method optimized: - It does not copy the message - It can
* only handle ActiveMQ messages. - You can specify if the send is async or
* sync - Does not allow you to send /w a transaction.
*/
void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
checkClosedOrFailed();
if (destination.isTemporary() && isDeleted(destination)) {
throw new JMSException("Cannot publish to a deleted Destination: " + destination);
}
msg.setJMSDestination(destination);
msg.setJMSDeliveryMode(deliveryMode);
long expiration = 0L;
if (!isDisableTimeStampsByDefault()) {
long timeStamp = System.currentTimeMillis();
msg.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
msg.setJMSExpiration(expiration);
msg.setJMSPriority(priority);
msg.setJMSRedelivered(false);
msg.setMessageId(messageId);
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isDebugEnabled()) {
LOG.debug("Sending message: " + msg);
}
if (async) {
asyncSendPacket(msg);
} else {
syncSendPacket(msg);
}
}
protected void onConnectionControl(ConnectionControl command) {
if (command.isFaultTolerant()) {
this.optimizeAcknowledge = false;
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.setOptimizeAcknowledge(false);
}
}
}
protected void onConsumerControl(ConsumerControl command) {
if (command.isClose()) {
for (ActiveMQSession session : this.sessions) {
session.close(command.getConsumerId());
}
} else {
for (ActiveMQSession session : this.sessions) {
session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
}
for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
consumerInfo.setPrefetchSize(command.getPrefetch());
}
}
}
}
protected void transportFailed(IOException error) {
transportFailed.set(true);
if (firstFailureError == null) {
firstFailureError = error;
}
}
/**
* Should a JMS message be copied to a new JMS Message object as part of the
* send() method in JMS. This is enabled by default to be compliant with the
* JMS specification. You can disable it if you do not mutate JMS messages
* after they are sent for a performance boost
*/
public void setCopyMessageOnSend(boolean copyMessageOnSend) {
this.copyMessageOnSend = copyMessageOnSend;
}
@Override
public String toString() {
return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
}
protected BlobTransferPolicy createBlobTransferPolicy() {
return new BlobTransferPolicy();
}
public int getProtocolVersion() {
return protocolVersion.get();
}
public int getProducerWindowSize() {
return producerWindowSize;
}
public void setProducerWindowSize(int producerWindowSize) {
this.producerWindowSize = producerWindowSize;
}
public void setAuditDepth(int auditDepth) {
connectionAudit.setAuditDepth(auditDepth);
}
public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
}
protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
connectionAudit.removeDispatcher(dispatcher);
}
protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
}
protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
connectionAudit.rollbackDuplicate(dispatcher, message);
}
public IOException getFirstFailureError() {
return firstFailureError;
}
protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) {
LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get());
signalInterruptionProcessingComplete();
}
}
protected void transportInterruptionProcessingComplete() {
if (transportInterruptionProcessingComplete.decrementAndGet() == 0) {
signalInterruptionProcessingComplete();
}
}
private void signalInterruptionProcessingComplete() {
if (LOG.isDebugEnabled()) {
LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get()
+ " for:" + this.getConnectionInfo().getConnectionId());
}
FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
if (failoverTransport != null) {
failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
if (LOG.isDebugEnabled()) {
LOG.debug("notified failover transport (" + failoverTransport
+ ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
}
}
transportInterruptionProcessingComplete.set(0);
}
private void signalInterruptionProcessingNeeded() {
FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
if (failoverTransport != null) {
failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
if (LOG.isDebugEnabled()) {
LOG.debug("notified failover transport (" + failoverTransport
+ ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
}
}
}
/*
* specify the amount of time in milliseconds that a consumer with a transaction pending recovery
* will wait to receive re dispatched messages.
* default value is 0 so there is no wait by default.
*/
public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
}
public long getConsumerFailoverRedeliveryWaitPeriod() {
return consumerFailoverRedeliveryWaitPeriod;
}
protected Scheduler getScheduler() throws JMSException {
Scheduler result = scheduler;
if (result == null) {
if (isClosing() || isClosed()) {
// without lock contention report the closing state
throw new ConnectionClosedException();
}
synchronized (schedulerLock) {
result = scheduler;
if (result == null) {
checkClosed();
try {
result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
result.start();
scheduler = result;
} catch(Exception e) {
throw JMSExceptionSupport.create(e);
}
}
}
}
return result;
}
protected ThreadPoolExecutor getExecutor() {
return this.executor;
}
protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
return sessions;
}
/**
* @return the checkForDuplicates
*/
public boolean isCheckForDuplicates() {
return this.checkForDuplicates;
}
/**
* @param checkForDuplicates the checkForDuplicates to set
*/
public void setCheckForDuplicates(boolean checkForDuplicates) {
this.checkForDuplicates = checkForDuplicates;
}
public boolean isTransactedIndividualAck() {
return transactedIndividualAck;
}
public void setTransactedIndividualAck(boolean transactedIndividualAck) {
this.transactedIndividualAck = transactedIndividualAck;
}
public boolean isNonBlockingRedelivery() {
return nonBlockingRedelivery;
}
public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
this.nonBlockingRedelivery = nonBlockingRedelivery;
}
public boolean isRmIdFromConnectionId() {
return rmIdFromConnectionId;
}
public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
this.rmIdFromConnectionId = rmIdFromConnectionId;
}
/**
* Removes any TempDestinations that this connection has cached, ignoring
* any exceptions generated because the destination is in use as they should
* not be removed.
* Used from a pooled connection, b/c it will not be explicitly closed.
*/
public void cleanUpTempDestinations() {
if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
return;
}
Iterator<ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
= this.activeTempDestinations.entrySet().iterator();
while(entries.hasNext()) {
ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
try {
// Only delete this temp destination if it was created from this connection. The connection used
// for the advisory consumer may also have a reference to this temp destination.
ActiveMQTempDestination dest = entry.getValue();
String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
this.deleteTempDestination(entry.getValue());
}
} catch (Exception ex) {
// the temp dest is in use so it can not be deleted.
// it is ok to leave it to connection tear down phase
}
}
}
/**
* Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
* @param redeliveryPolicyMap the redeliveryPolicyMap to set
*/
public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
this.redeliveryPolicyMap = redeliveryPolicyMap;
}
/**
* Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
* Consumers when dealing with transaction messages that have been rolled back.
*
* @return the redeliveryPolicyMap
*/
public RedeliveryPolicyMap getRedeliveryPolicyMap() {
return redeliveryPolicyMap;
}
public int getMaxThreadPoolSize() {
return maxThreadPoolSize;
}
public void setMaxThreadPoolSize(int maxThreadPoolSize) {
this.maxThreadPoolSize = maxThreadPoolSize;
}
/**
* Enable enforcement of QueueConnection semantics.
*
* @return this object, useful for chaining
*/
ActiveMQConnection enforceQueueOnlyConnection() {
this.queueOnlyConnection = true;
return this;
}
public RejectedExecutionHandler getRejectedTaskHandler() {
return rejectedTaskHandler;
}
public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
this.rejectedTaskHandler = rejectedTaskHandler;
}
/**
* Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
* to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers
* will not do any background Message acknowledgment.
*
* @return the scheduledOptimizedAckInterval
*/
public long getOptimizedAckScheduledAckInterval() {
return optimizedAckScheduledAckInterval;
}
/**
* Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
* have been configured with optimizeAcknowledge enabled.
*
* @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
*/
public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
}
/**
* @return true if MessageConsumer instance will check for expired messages before dispatch.
*/
public boolean isConsumerExpiryCheckEnabled() {
return consumerExpiryCheckEnabled;
}
/**
* Controls whether message expiration checking is done in each MessageConsumer
* prior to dispatching a message. Disabling this check can lead to consumption
* of expired messages.
*
* @param consumerExpiryCheckEnabled
* controls whether expiration checking is done prior to dispatch.
*/
public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
}
public List<String> getTrustedPackages() {
return trustedPackages;
}
public void setTrustedPackages(List<String> trustedPackages) {
this.trustedPackages = trustedPackages;
}
public boolean isTrustAllPackages() {
return trustAllPackages;
}
public void setTrustAllPackages(boolean trustAllPackages) {
this.trustAllPackages = trustAllPackages;
}
public int getConnectResponseTimeout() {
return connectResponseTimeout;
}
public void setConnectResponseTimeout(int connectResponseTimeout) {
this.connectResponseTimeout = connectResponseTimeout;
}
}