blob: 4426fb6866694e94cecd7fc44b3c6b1398a60e5a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageEOFException;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.StreamMessage;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.failover.ConnectionRedirectException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.security.CallbackHandlerRegistry;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.jms.Session;
import org.apache.qpid.jndi.ObjectFactory;
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.url.URLSyntaxException;
public class AMQConnection extends Closeable implements CommonConnection, Referenceable,
ClassLoadingAwareObjectInputStream.TrustedClassFilter
{
public static final String JNDI_ADDRESS_CONNECTION_URL = "connectionURL";
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
static
{
ClientProperties.ensureIsLoaded();
}
private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT,
ClientProperties.DEFAULT_CLOSE_TIMEOUT);
private final long _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet();
private final List<String> _whiteListedClassHierarchies;
private final List<String> _blackListedClassHierarchies;
/**
* This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
* held by any child objects of this connection such as the session, producers and consumers.
*/
private final Object _failoverMutex = new Object();
private final Object _sessionCreationLock = new Object();
/**
* A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
* and we must prevent the client from opening too many.
*/
private long _maximumChannelCount;
/** The maximum size of frame supported by the server */
private long _maximumFrameSize;
/**
* The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
* the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
* handler.
*/
private final AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
private String _clientName;
/** The user name to use for authentication */
private String _username;
/** The password to use for authentication */
private String _password;
/** The virtual path to connect to on the AMQ server */
private String _virtualHost;
/** The exception listener for this connection object. */
private volatile ExceptionListener _exceptionListener;
private ConnectionListener _connectionListener;
private final ConnectionURL _connectionURL;
/**
* Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
* publication.
*/
private volatile boolean _started;
/** Policy dictating how to failover */
private FailoverPolicy _failoverPolicy;
/*
* _Connected should be refactored with a suitable wait object.
*/
private boolean _connected;
private boolean _connectionAttempted;
/*
* The connection meta data
*/
private QpidConnectionMetaData _connectionMetaData;
private String _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
private String _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
private String _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
private String _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
/**
* Thread Pool for executing connection level processes such as reporting asynchronous exceptions
* and for 0-8..0-91 returning bounced messages.
*/
private final ScheduledExecutorService _taskPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
{
@Override
public Thread newThread(final Runnable r)
{
final String name = "Connection_" + AMQConnection.this._connectionNumber + "_task";
_logger.debug("Creating connection pooled thread '{}'", name);
Thread thread = new Thread(r, name);
if (!thread.isDaemon())
{
thread.setDaemon(true);
}
return thread;
}
});
private AMQConnectionDelegate _delegate;
// this connection maximum number of prefetched messages
private int _maxPrefetch;
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
//Indicates whether we need to sync on every message ack
private boolean _syncAck;
//Indicates whether we need to sync on every message ack on a client ack session, default to true
private final boolean _syncClientAck;
//Indicates the sync publish options (persistent|all)
//By default it's async publish
private String _syncPublish = "";
//Indicates whether user-id should be attached to every sent message
//By default the user ID is attached
private boolean _populateUserId = true;
// Indicates whether to use the old map message format or the
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
// Indicates whether to use the old stream message format or the
// new amqp-0-10 list encoded format.
private boolean _useLegacyStreamMessageFormat;
// When sending to a Queue destination for the first time, check that the queue is bound
private final boolean _validateQueueOnSend;
//used to track the last failover time for
//Address resolution purposes
private volatile long _lastFailoverTime = 0;
private boolean _compressMessages;
private int _messageCompressionThresholdSize;
private final Map<String, String> _virtualHostProperties = new HashMap<>();
private volatile boolean _virtualHostPropertiesPopulated;
static
{
if (_logger.isDebugEnabled())
{
_logger.debug("Qpid version : " + CommonProperties.getVersionString());
}
// The registering of any additional SASL mechanisms with the Java Security API requires
// SecurityManager permissions. In execution environments such as web containers,
// this may require adjustments to the Java security.policy.
CallbackHandlerRegistry registry = CallbackHandlerRegistry.getInstance();
if (_logger.isDebugEnabled())
{
_logger.debug("Loaded mechanisms " + registry.getMechanisms());
}
}
private ConnectionSettings _connectionSettings;
private final ConcurrentMap<String, KeyStore> _brokerTrustStores = new ConcurrentHashMap<>();
private Session _brokerTrustStoreSession;
/**
* @param broker brokerdetails
* @param username username
* @param password password
* @param clientName clientid
* @param virtualHost virtualhost
*
* @throws QpidException
* @throws URLSyntaxException
*/
public AMQConnection(String broker, String username, String password, String clientName, String virtualHost)
throws QpidException, URLSyntaxException
{
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
+ BrokerDetails.checkTransport(broker) + "'"));
}
public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
throws QpidException, URLSyntaxException
{
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"));
}
public AMQConnection(String connection) throws QpidException, URLSyntaxException
{
this(new AMQConnectionURL(connection));
}
public AMQConnection(ConnectionURL connectionURL) throws QpidException
{
boolean success = false;
try
{
if (connectionURL == null)
{
throw new IllegalArgumentException("Connection must be specified");
}
if (_logger.isDebugEnabled())
{
_logger.debug("Connection(" + _connectionNumber + "):" + connectionURL);
}
// set this connection maxPrefetch
if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
_maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
}
else
{
// use the default value set for all connections
_maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
ClientProperties.MAX_PREFETCH_DEFAULT));
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
{
_syncPersistence =
Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
_logger.warn("sync_persistence is a deprecated property, " +
"please use sync_publish={persistent|all} instead");
}
else
{
// use the default value set for all connections
_syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
if (_syncPersistence)
{
_logger.warn("sync_persistence is a deprecated property, " +
"please use sync_publish={persistent|all} instead");
}
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
{
_syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
}
else
{
// use the default value set for all connections
_syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_CLIENT_ACK) != null)
{
_syncClientAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_CLIENT_ACK));
}
else
{
String legacyProperty = System.getProperty("qpid.sync_after_client.ack");
if (legacyProperty != null)
{
_logger.warn("'qpid.sync_after_client.ack' is a deprecated system property, " +
"please use '{}' instead", ClientProperties.SYNC_CLIENT_ACK);
}
_syncClientAck = Boolean.parseBoolean(System.getProperty(ClientProperties.SYNC_CLIENT_ACK,
legacyProperty != null ? legacyProperty : "true"));
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
{
_syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
}
else
{
// use the default value set for all connections
_syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish);
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_POPULATE_USER_ID) != null)
{
_populateUserId = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_POPULATE_USER_ID));
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null)
{
_useLegacyMapMessageFormat = Boolean.parseBoolean(
connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT));
}
else
{
// use the default value set for all connections
_useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT) != null)
{
_useLegacyStreamMessageFormat = Boolean.parseBoolean(
connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT));
}
else
{
// use the default value set for all connections
_useLegacyStreamMessageFormat = System.getProperty(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT) == null ?
true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT);
}
if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null)
{
_validateQueueOnSend = Boolean.parseBoolean(
connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND));
}
else
{
_validateQueueOnSend =
Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false"));
}
if(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES) != null)
{
_compressMessages = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES));
}
else
{
_compressMessages =
Boolean.parseBoolean(System.getProperty(ClientProperties.CONNECTION_OPTION_COMPRESS_MESSAGES,
String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES)));
}
if(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE) != null)
{
_messageCompressionThresholdSize = Integer.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE));
}
else
{
_messageCompressionThresholdSize = Integer.getInteger(ClientProperties.CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE,
ClientProperties.DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE);
}
if(_messageCompressionThresholdSize <= 0)
{
_messageCompressionThresholdSize = Integer.MAX_VALUE;
}
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
if (_logger.isDebugEnabled())
{
_logger.debug("AMQP version " + amqpVersion);
}
_failoverPolicy = new FailoverPolicy(connectionURL, this);
if ("0-8".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
}
else if ("0-9".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_0_9(this);
}
else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_0_91(this);
}
else
{
_delegate = new AMQConnectionDelegate_0_10(this);
}
_connectionURL = connectionURL;
_clientName = connectionURL.getClientName();
_username = connectionURL.getUsername();
_password = connectionURL.getPassword();
setVirtualHost(connectionURL.getVirtualHost());
if (connectionURL.getDefaultQueueExchangeName() != null)
{
_defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
}
if (connectionURL.getDefaultTopicExchangeName() != null)
{
_defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName();
}
if (connectionURL.getTemporaryQueueExchangeName() != null)
{
_temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName();
}
if (connectionURL.getTemporaryTopicExchangeName() != null)
{
_temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
}
_protocolHandler = new AMQProtocolHandler(this);
if (_logger.isDebugEnabled())
{
_logger.debug("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
}
// We are not currently connected
setConnected(false);
if (_clientName != null)
{
makeConnection();
}
_connectionMetaData = new QpidConnectionMetaData();
if (connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_WHITE_LIST) != null)
{
String whiteListedClassHierarchiesString = connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_WHITE_LIST);
_whiteListedClassHierarchies = Arrays.asList(whiteListedClassHierarchiesString.split(","));
}
else
{
final String defaultWhiteListedClassHierarchiesString = System.getProperty(CommonProperties.QPID_SECURITY_OBJECT_MESSAGE_CLASS_HIERARCHY_WHITE_LIST, "*");
_whiteListedClassHierarchies = Arrays.asList(defaultWhiteListedClassHierarchiesString.split(","));
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_BLACK_LIST) != null)
{
String blackListedClassHierarchiesString = connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_BLACK_LIST);
_blackListedClassHierarchies = Arrays.asList(blackListedClassHierarchiesString.split(","));
}
else
{
final String defaultBlackListedClassHierarchiesString = System.getProperty(CommonProperties.QPID_SECURITY_OBJECT_MESSAGE_CLASS_HIERARCHY_BLACK_LIST, "");
_blackListedClassHierarchies = Arrays.asList(defaultBlackListedClassHierarchiesString.split(","));
}
success = true;
}
finally
{
if (!success)
{
shutdownTaskPool();
}
}
}
private void makeConnection() throws QpidException
{
_connectionAttempted = true;
if(_clientName == null)
{
try
{
InetAddress addr = InetAddress.getLocalHost();
_clientName = addr.getHostName() + System.currentTimeMillis();
}
catch (UnknownHostException e)
{
_clientName = "UnknownHost" + UUID.randomUUID();
}
}
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
boolean retryAllowed = true;
Exception connectionException = null;
while (!isConnected() && retryAllowed && brokerDetails != null)
{
ProtocolVersion pe = null;
try
{
pe = makeBrokerConnection(brokerDetails);
}
catch (Exception e)
{
if (_logger.isInfoEnabled())
{
_logger.info("Unable to connect to broker at " +
_failoverPolicy.getCurrentBrokerDetails(),
e);
}
connectionException = e;
}
if (pe != null)
{
// reset the delegate to the version returned by the
// broker
initDelegate(pe);
}
else if (!isConnected())
{
if(connectionException instanceof ConnectionRedirectException)
{
ConnectionRedirectException redirect = (ConnectionRedirectException) connectionException;
retryAllowed = true;
brokerDetails = new BrokerDetails(brokerDetails);
brokerDetails.setHost(redirect.getHost());
brokerDetails.setPort(redirect.getPort());
_protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession()));
}
else
{
retryAllowed = _failoverPolicy.failoverAllowed();
brokerDetails = _failoverPolicy.getNextBrokerDetails();
_protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession()));
}
}
}
verifyClientID();
if (_logger.isDebugEnabled())
{
_logger.debug("Are we connected:" + isConnected());
}
if (!isConnected())
{
if (_logger.isDebugEnabled())
{
_logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
}
String message = null;
if (connectionException != null)
{
if (connectionException.getCause() != null)
{
message = connectionException.getCause().getMessage();
}
else
{
message = connectionException.getMessage();
}
}
if (message == null)
{
message = "Unable to Connect";
}
else if("".equals(message))
{
message = "Unable to Connect:" + connectionException.getClass();
}
for (Throwable th = connectionException; th != null; th = th.getCause())
{
if (th instanceof UnresolvedAddressException ||
th instanceof UnknownHostException)
{
throw new AMQUnresolvedAddressException
(message,
_failoverPolicy.getCurrentBrokerDetails().toString(),
connectionException);
}
}
throw new AMQConnectionFailureException(message, connectionException);
}
if (_logger.isDebugEnabled())
{
_logger.debug("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
}
_sessions.setMaxChannelID(_delegate.getMaxChannelID());
_sessions.setMinChannelID(_delegate.getMinChannelID());
}
private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
{
try
{
String delegateClassName = String.format
("org.apache.qpid.client.AMQConnectionDelegate_%s_%s",
pe.getMajorVersion(), pe.getMinorVersion());
if (_logger.isDebugEnabled())
{
_logger.debug("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe);
}
Class c = Class.forName(delegateClassName);
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
if (!ProtocolVersion.v0_10.equals(_delegate.getProtocolVersion()))
{
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
}
// reset state waiter state
_protocolHandler.getStateManager().clearLastException();
_protocolHandler.getStateManager().changeState(AMQState.CONNECTION_NOT_STARTED);
}
catch (ClassNotFoundException e)
{
String errorMessage = String.format("Protocol: %s.%s is required by the broker but is not " +
"currently supported by this client library implementation",
pe.getMajorVersion(), pe.getMinorVersion());
throw new AMQProtocolException(errorMessage, e);
}
catch (NoSuchMethodException e)
{
throw new RuntimeException("unable to locate constructor for delegate", e);
}
catch (InstantiationException e)
{
throw new RuntimeException("error instantiating delegate", e);
}
catch (IllegalAccessException e)
{
throw new RuntimeException("error accessing delegate", e);
}
catch (InvocationTargetException e)
{
throw new RuntimeException("error invoking delegate", e);
}
}
private void setVirtualHost(String virtualHost)
{
if (virtualHost != null && virtualHost.startsWith("/"))
{
virtualHost = virtualHost.substring(1);
}
_virtualHost = virtualHost;
}
public boolean attemptReconnection(String host, int port, final boolean useFailoverConfigOnFailure)
{
BrokerDetails bd = new BrokerDetails(_failoverPolicy.getCurrentBrokerDetails());
bd.setHost(host);
bd.setPort(port);
_failoverPolicy.setBroker(bd);
try
{
makeBrokerConnection(bd);
return true;
}
catch (Exception e)
{
if (_logger.isInfoEnabled())
{
_logger.info("Unable to connect to broker at " + bd);
}
return useFailoverConfigOnFailure && attemptReconnection();
}
}
public boolean attemptReconnection()
{
BrokerDetails broker;
while (!isClosed() && !isClosing() && _failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null)
{
if (attemptConnection(broker))
{
return true;
}
}
// connection unsuccessful
return false;
}
private boolean attemptConnection(final BrokerDetails broker)
{
try
{
makeBrokerConnection(broker);
return true;
}
catch (Exception e)
{
if (!(e instanceof QpidException))
{
if (_logger.isInfoEnabled())
{
_logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
}
}
else
{
if (_logger.isInfoEnabled())
{
_logger.info(e.getMessage() + ":Unable to connect to broker at "
+ _failoverPolicy.getCurrentBrokerDetails());
}
}
}
return false;
}
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, QpidException
{
return _delegate.makeBrokerConnection(brokerDetail);
}
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
return _delegate.executeRetrySupport(operation);
}
/**
* Get the details of the currently active broker
*
* @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
* otherwise
*/
public BrokerDetails getActiveBrokerDetails()
{
return _failoverPolicy.getCurrentBrokerDetails();
}
public boolean failoverAllowed()
{
if (!isConnected())
{
return false;
}
else
{
return _failoverPolicy.failoverAllowed();
}
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
return createSession(transacted, acknowledgeMode, _maxPrefetch);
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
throws JMSException
{
return createSession(transacted, acknowledgeMode, prefetch, prefetch);
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException
{
synchronized (_sessionCreationLock)
{
checkNotClosed();
if(!_connectionAttempted)
{
try
{
makeConnection();
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("Unable to establish connection"),e);
}
}
if(_delegate.isVirtualHostPropertiesSupported() && !_virtualHostPropertiesPopulated)
{
retrieveVirtualHostPropertiesIfNecessary();
}
return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
}
}
private void retrieveVirtualHostPropertiesIfNecessary() throws JMSException
{
synchronized (_virtualHostProperties)
{
if(!_virtualHostPropertiesPopulated)
{
final Session session = _delegate.createSession(false, AMQSession.NO_ACKNOWLEDGE, 3,3);
final MessageConsumer consumer = session.createConsumer(session.createQueue(
"ADDR: $virtualhostProperties; {assert: never, create: never, node:{ type: queue }}"));
try
{
((AMQSession)session).start();
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException(
"Failed to retrieve virtual host properties"), e);
}
Message propertiesMessage = consumer.receive(getProtocolHandler().getDefaultTimeout());
if(propertiesMessage != null)
{
for(String property : Collections.list((Enumeration<String>) propertiesMessage.getPropertyNames()))
{
_virtualHostProperties.put(property, propertiesMessage.getStringProperty(property));
}
}
session.close();
_virtualHostPropertiesPopulated = true;
}
}
}
public KeyStore getBrokerSuppliedTrustStore(final String name) throws JMSException
{
synchronized(_brokerTrustStores)
{
if(!_brokerTrustStores.containsKey(name))
{
if(_brokerTrustStoreSession == null)
{
_brokerTrustStoreSession = _delegate.createSession(false, AMQSession.AUTO_ACKNOWLEDGE, 1, 1);
try
{
((AMQSession) _brokerTrustStoreSession).start();
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException(
"Failed to retrieve virtual host properties"), e);
}
}
final MessageConsumer consumer = _brokerTrustStoreSession.createConsumer(_brokerTrustStoreSession.createQueue(
"ADDR: " + name + "; {assert: never, create: never, node:{ type: queue }}"));
final Message message = consumer.receive(2000l);
if(message != null)
{
StreamMessage streamMessage = (StreamMessage) message;
List<X509Certificate> certs = new ArrayList<>();
try
{
try
{
final CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
byte[] bytes;
while ((bytes = (byte[]) streamMessage.readObject()) != null)
{
certs.add((X509Certificate) certFactory.generateCertificate(new ByteArrayInputStream(
bytes)));
}
}
catch (MessageEOFException e)
{
// end of message
}
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
char[] encryptionTrustStorePassword =
getConnectionSettings().getEncryptionTrustStorePassword() == null
? null
: getConnectionSettings().getEncryptionTrustStorePassword().toCharArray();
keyStore.load(null, encryptionTrustStorePassword);
int i = 1;
for (X509Certificate cert : certs)
{
keyStore.setCertificateEntry(String.valueOf(i++), cert);
}
_brokerTrustStores.put(name, keyStore);
}
catch (JMSException | GeneralSecurityException | IOException e)
{
_logger.error(e.getMessage(), e);
}
}
}
return _brokerTrustStores.get(name);
}
}
public void setFailoverPolicy(FailoverPolicy policy)
{
_failoverPolicy = policy;
}
public FailoverPolicy getFailoverPolicy()
{
return _failoverPolicy;
}
/**
* Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
* the JMS spec
*
* @param transacted
* @param acknowledgeMode
*
* @return QueueSession
*
* @throws JMSException
*/
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
{
return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode));
}
/**
* Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
* the JMS spec
*
* @param transacted
* @param acknowledgeMode
*
* @return TopicSession
*
* @throws JMSException
*/
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
{
return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode));
}
public boolean channelLimitReached()
{
return _sessions.size() >= _maximumChannelCount;
}
public String getClientID() throws JMSException
{
checkNotClosed();
return _clientName;
}
public void setClientID(String clientID) throws JMSException
{
checkNotClosed();
synchronized(_sessionCreationLock)
{
if(_connectionAttempted)
{
// in AMQP it is not possible to change the client ID. If one is not specified
// upon connection construction, an id is generated automatically. Therefore
// we can always throw an exception.
if (!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME))
{
throw new IllegalStateException("Client name cannot be changed after being set");
}
else
{
_logger.info("Operation setClientID is ignored using ID: " + getClientID());
}
}
else
{
_clientName = clientID;
}
}
}
public ConnectionMetaData getMetaData() throws JMSException
{
checkNotClosed();
return _connectionMetaData;
}
protected final ExceptionListener getExceptionListenerNoCheck()
{
return _exceptionListener;
}
public ExceptionListener getExceptionListener() throws JMSException
{
checkNotClosed();
return getExceptionListenerNoCheck();
}
public void setExceptionListener(ExceptionListener listener) throws JMSException
{
checkNotClosed();
_exceptionListener = listener;
}
/**
* Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
* and is not thread safe (which is legal according to the JMS specification).
*
* @throws JMSException
*/
public void start() throws JMSException
{
checkNotClosed();
if (!_started)
{
_started = true;
final Iterator it = _sessions.values().iterator();
while (it.hasNext())
{
final AMQSession s = (AMQSession) (it.next());
try
{
s.start();
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("Connection.start failed"), e);
}
}
}
}
public void stop() throws JMSException
{
checkNotClosed();
if (_started)
{
for (Iterator i = _sessions.values().iterator(); i.hasNext();)
{
try
{
((AMQSession) i.next()).stop();
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("Connection.stop failed."), e);
}
}
_started = false;
}
}
public void close() throws JMSException
{
close(DEFAULT_CLOSE_TIMEOUT);
}
private void close(long timeout) throws JMSException
{
boolean closed;
synchronized (_sessionCreationLock)
{
closed = setClosed();
}
try
{
if (!closed)
{
List<AMQSession> sessions = new ArrayList<>(_sessions.values());
setClosing(true);
try
{
doClose(sessions, timeout);
}
finally
{
setClosing(false);
}
}
}
finally
{
shutdownTaskPool();
}
}
private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
{
if (!sessions.isEmpty())
{
AMQSession session = sessions.remove(0);
session.lockMessageDelivery();
try
{
doClose(sessions, timeout);
}
finally
{
session.unlockMessageDelivery();
}
}
else
{
synchronized (getFailoverMutex())
{
try
{
closeAllSessions(null, timeout);
}
catch (JMSException e)
{
_logger.warn("Error closing connection", e);
throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing connection: " + e), e);
}
finally
{
try
{
_delegate.closeConnection(timeout);
}
catch (Exception e)
{
_logger.warn("Error closing underlying protocol connection", e);
}
}
}
}
}
private void shutdownTaskPool()
{
_taskPool.shutdown();
}
/**
* Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
* mark objects "visible" in userland as closed after failover or other significant event that impacts the
* connection. <p/> The caller must hold the failover mutex before calling this method.
*/
private void markAllSessionsClosed()
{
final LinkedList sessionCopy = new LinkedList(_sessions.values());
final Iterator it = sessionCopy.iterator();
while (it.hasNext())
{
final AMQSession session = (AMQSession) it.next();
session.markClosed();
}
_sessions.clear();
}
/**
* Close all the sessions, either due to normal connection closure or due to an error occurring.
*
* @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
* before calling this method.
*/
private void closeAllSessions(Throwable cause, long timeout) throws JMSException
{
final LinkedList sessionCopy = new LinkedList(_sessions.values());
final Iterator it = sessionCopy.iterator();
JMSException sessionException = null;
while (it.hasNext())
{
final AMQSession session = (AMQSession) it.next();
if (cause != null)
{
session.closed(cause);
}
else
{
try
{
session.close(timeout);
}
catch (JMSException e)
{
_logger.warn("Error closing session: " + e);
sessionException = e;
}
}
}
_sessions.clear();
if (sessionException != null)
{
throw sessionException;
}
}
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
checkNotClosed();
throw new JmsNotImplementedException();
}
public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException
{
checkNotClosed();
throw new JmsNotImplementedException();
}
public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException
{
checkNotClosed();
throw new JmsNotImplementedException();
}
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
checkNotClosed();
throw new JmsNotImplementedException();
}
public long getMaximumChannelCount() throws JMSException
{
checkNotClosed();
return _maximumChannelCount;
}
public void setConnectionListener(ConnectionListener listener)
{
_connectionListener = listener;
}
public ConnectionListener getConnectionListener()
{
return _connectionListener;
}
public void setMaximumChannelCount(long maximumChannelCount)
{
_maximumChannelCount = maximumChannelCount;
}
public void setMaximumFrameSize(long frameMax)
{
_maximumFrameSize = frameMax;
}
public long getMaximumFrameSize()
{
return _maximumFrameSize;
}
public ChannelToSessionMap getSessions()
{
return _sessions;
}
public String getUsername()
{
return _username;
}
public void setUsername(String id)
{
_username = id;
}
public String getPassword()
{
return _password;
}
public String getVirtualHost()
{
return _virtualHost;
}
public final AMQProtocolHandler getProtocolHandler()
{
return _protocolHandler;
}
public final boolean started()
{
return _started;
}
public final boolean isConnected()
{
return _connected;
}
protected final void setConnected(boolean connected)
{
_connected = connected;
}
public void bytesSent(long writtenBytes)
{
if (_connectionListener != null)
{
_connectionListener.bytesSent(writtenBytes);
}
}
public void bytesReceived(long receivedBytes)
{
if (_connectionListener != null)
{
_connectionListener.bytesReceived(receivedBytes);
}
}
/**
* Fire the preFailover event to the registered connection listener (if any)
*
* @param redirect true if this is the result of a redirect request rather than a connection error
*
* @return true if no listener or listener does not veto change
*/
public boolean firePreFailover(boolean redirect)
{
_lastFailoverTime = System.currentTimeMillis();
boolean proceed = true;
if (_connectionListener != null)
{
proceed = _connectionListener.preFailover(redirect);
}
return proceed;
}
/**
* Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
* resubscription then all the sessions are closed.
*
* @return true if no listener or listener does not veto resubscription.
*
* @throws JMSException
*/
public boolean firePreResubscribe() throws JMSException
{
if (_connectionListener != null)
{
boolean resubscribe = _connectionListener.preResubscribe();
if (!resubscribe)
{
markAllSessionsClosed();
}
return resubscribe;
}
else
{
return true;
}
}
/** Fires a failover complete event to the registered connection listener (if any). */
public void fireFailoverComplete()
{
if (_connectionListener != null)
{
_connectionListener.failoverComplete();
}
}
/**
* In order to protect the consistency of the connection and its child sessions, consumers and producers, the
* "failover mutex" must be held when doing any operations that could be corrupted during failover.
*
* @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
*/
public final Object getFailoverMutex()
{
return _failoverMutex;
}
public void resubscribeSessions() throws JMSException, QpidException, FailoverException
{
_delegate.resubscribeSessions();
}
/**
* If failover is taking place this will block until it has completed. If failover is not taking place it will
* return immediately.
*
* @throws InterruptedException
*/
public void blockUntilNotFailingOver() throws InterruptedException
{
_protocolHandler.blockUntilNotFailingOver();
}
/**
* Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
* to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
* propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
*
* @param cause the exception
*/
public void exceptionReceived(Throwable cause)
{
if (_logger.isDebugEnabled())
{
_logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
}
final JMSException je = convertToJMSException(cause);
try
{
if (hardError(cause))
{
closeSessions(cause);
}
}
finally
{
deliverJMSExceptionToExceptionListenerOrLog(je, cause);
}
}
private JMSException convertToJMSException(Throwable cause)
{
final JMSException je;
if (cause instanceof JMSException)
{
je = (JMSException) cause;
}
else
{
int errorCode = 0;
if (cause instanceof AMQException)
{
errorCode = ((AMQException) cause).getErrorCode();
}
if (errorCode != 0)
{
je = JMSExceptionHelper.chainJMSException(new JMSException("Exception thrown against "
+ toString()
+ ": "
+ cause, Integer.toString(errorCode)),
cause);
}
else
{
//Should never get here as all AMQEs are required to have an ErrorCode!
// Other than AMQDisconnectedEx!
if (cause instanceof AMQDisconnectedException)
{
Exception last = _protocolHandler.getStateManager().getLastException();
if (last != null)
{
_logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception");
cause = last;
}
}
je = JMSExceptionHelper.chainJMSException(new JMSException("Exception thrown against "
+ toString()
+ ": "
+ cause), cause);
}
}
return je;
}
void closed(Throwable cause)
{
_logger.debug("Closing closed connection {} ", this.toString());
final JMSException je = convertToJMSException(cause);
try
{
_protocolHandler.getProtocolSession().notifyError(je);
boolean performClose = !setClosed();
// if we are closing the connection, close sessions first
if (performClose)
{
closeSessions(cause);
}
}
finally
{
deliverJMSExceptionToExceptionListenerOrLog(je, cause);
}
}
private void closeSessions(Throwable cause)
{
// get the failover mutex before trying to close
synchronized (getFailoverMutex())
{
try
{
closeAllSessions(cause, -1);
}
catch (JMSException e)
{
_logger.error("Error closing all sessions: " + e, e);
}
}
}
private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, final Throwable cause)
{
final ExceptionListener exceptionListener = getExceptionListenerNoCheck();
if (exceptionListener != null)
{
performConnectionTask(new Runnable()
{
@Override
public void run()
{
// deliver the exception if there is a listener
try
{
exceptionListener.onException(je);
}
catch (RuntimeException e)
{
_logger.error("Exception occurred in ExceptionListener", e);
}
}
});
}
else
{
_logger.error("Throwable Received but no listener set: " + cause);
}
}
private boolean hardError(Throwable cause)
{
if (cause instanceof AMQException)
{
return ((AMQException) cause).isHardError();
}
return true;
}
void registerSession(int channelId, AMQSession session)
{
_sessions.put(channelId, session);
}
public void deregisterSession(int channelId)
{
_sessions.remove(channelId);
}
public String toString()
{
StringBuffer buf = new StringBuffer("AMQConnection:\n");
if (_failoverPolicy.getCurrentBrokerDetails() == null)
{
buf.append("No active broker connection");
}
else
{
BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails();
buf.append("Host: ").append(String.valueOf(bd.getHost()));
buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
}
buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
buf.append("\nClient ID: ").append(String.valueOf(_clientName));
buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size());
return buf.toString();
}
/**
* Returns connection url.
* @return connection url
*/
public ConnectionURL getConnectionURL()
{
return _connectionURL;
}
/**
* Returns stringified connection url. This url is suitable only for display
* as {@link AMQConnectionURL#toString()} converts any password to asterisks.
* @return connection url
*/
public String toURL()
{
return _connectionURL.toString();
}
public Reference getReference() throws NamingException
{
return new Reference(AMQConnection.class.getName(), new StringRefAddr(JNDI_ADDRESS_CONNECTION_URL, toURL()),
ObjectFactory.class.getName(), null); // factory location
}
public String getDefaultTopicExchangeName()
{
return _defaultTopicExchangeName;
}
public void setDefaultTopicExchangeName(String defaultTopicExchangeName)
{
_defaultTopicExchangeName = defaultTopicExchangeName;
}
public String getDefaultQueueExchangeName()
{
return _defaultQueueExchangeName;
}
public void setDefaultQueueExchangeName(String defaultQueueExchangeName)
{
_defaultQueueExchangeName = defaultQueueExchangeName;
}
public String getTemporaryTopicExchangeName()
{
return _temporaryTopicExchangeName;
}
public String getTemporaryQueueExchangeName()
{
return _temporaryQueueExchangeName;
}
public void setTemporaryTopicExchangeName(String temporaryTopicExchangeName)
{
_temporaryTopicExchangeName = temporaryTopicExchangeName;
}
public void setTemporaryQueueExchangeName(String temporaryQueueExchangeName)
{
_temporaryQueueExchangeName = temporaryQueueExchangeName;
}
public void performConnectionTask(Runnable task)
{
try
{
_taskPool.execute(task);
}
catch (RejectedExecutionException e)
{
if(!(isClosed() || isClosing()))
{
throw e;
}
}
}
ScheduledFuture<?> scheduleTask(Runnable task, long initialDelay, long period, TimeUnit timeUnit)
{
return _taskPool.scheduleAtFixedRate(task, initialDelay, period, timeUnit);
}
public AMQSession getSession(int channelId)
{
return _sessions.get(channelId);
}
public ProtocolVersion getProtocolVersion()
{
return _delegate.getProtocolVersion();
}
public String getBrokerUUID()
{
if(getProtocolVersion().equals(ProtocolVersion.v0_10))
{
return ((AMQConnectionDelegate_0_10)_delegate).getUUID();
}
else
{
return null;
}
}
/**
* Tests whether the Broker has advertised support for the named feature.
*
* @param featureName
*
* @return true if the feature is supported, or false otherwise.
*/
boolean isSupportedServerFeature(final String featureName)
{
return _delegate.isSupportedServerFeature(featureName);
}
public boolean isFailingOver()
{
return (_protocolHandler.getFailoverLatch() != null);
}
/**
* Get the maximum number of messages that this connection can pre-fetch.
*
* @return The maximum number of messages that this connection can pre-fetch.
*/
public long getMaxPrefetch()
{
return _maxPrefetch;
}
/**
* Indicates whether persistent messages are synchronized
*
* @return true if persistent messages are synchronized false otherwise
*/
public boolean getSyncPersistence()
{
return _syncPersistence;
}
/**
* Indicates whether we need to sync on every message ack
*/
public boolean getSyncAck()
{
return _syncAck;
}
boolean getSyncClientAck()
{
return _syncClientAck;
}
public String getSyncPublish()
{
return _syncPublish;
}
public boolean isPopulateUserId()
{
return _populateUserId;
}
public boolean isMessageCompressionDesired()
{
return _compressMessages;
}
public int getNextChannelID()
{
return _sessions.getNextChannelId();
}
public boolean isUseLegacyMapMessageFormat()
{
return _useLegacyMapMessageFormat;
}
public boolean isUseLegacyStreamMessageFormat()
{
return _useLegacyStreamMessageFormat;
}
private void verifyClientID() throws QpidException
{
if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID))
{
try
{
if (!_delegate.verifyClientID())
{
throw new AMQException(ErrorCodes.ALREADY_EXISTS, "ClientID must be unique");
}
}
catch(JMSException e)
{
throw new QpidException(e.getMessage(),e);
}
}
}
public long getLastFailoverTime()
{
return _lastFailoverTime;
}
protected AMQConnectionDelegate getDelegate()
{
return _delegate;
}
public Long getConnectionNumber()
{
return _connectionNumber;
}
protected void logConnected(SocketAddress localAddress, SocketAddress remoteAddress)
{
if(_logger.isInfoEnabled())
{
_logger.info("Connection " + _connectionNumber + " now connected from "
+ localAddress + " to " + remoteAddress);
}
}
void setHeartbeatListener(HeartbeatListener listener)
{
_delegate.setHeartbeatListener(listener);
}
public boolean validateQueueOnSend()
{
return _validateQueueOnSend;
}
public int getMessageCompressionThresholdSize()
{
return _messageCompressionThresholdSize;
}
void doWithAllLocks(Runnable r)
{
doWithAllLocks(r, _sessions.values());
}
private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions)
{
if (!sessions.isEmpty())
{
AMQSession session = sessions.remove(0);
Object dispatcherLock = session.getDispatcherLock();
if (dispatcherLock == null)
{
dispatcherLock = new Object(); // use dummy intrinsic lock to make subsequent code nicer
}
synchronized (dispatcherLock)
{
session.lockMessageDelivery();
try
{
doWithAllLocks(r, sessions);
}
finally
{
session.unlockMessageDelivery();
}
}
}
else
{
synchronized (getFailoverMutex())
{
r.run();
}
}
}
public String getTemporaryQueuePrefix()
{
if(_delegate.isVirtualHostPropertiesSupported())
{
final String prefix = getVirtualHostProperty("virtualHost.temporaryQueuePrefix");
return prefix == null ? "" : prefix;
}
else
{
return "";
}
}
String getVirtualHostProperty(final String propertyName)
{
return _virtualHostProperties.get(propertyName);
}
public void setConnectionSettings(final ConnectionSettings connectionSettings)
{
_connectionSettings = connectionSettings;
}
public ConnectionSettings getConnectionSettings()
{
return _connectionSettings;
}
@Override
public boolean isTrusted(Class<?> clazz)
{
while (clazz.isArray())
{
clazz = clazz.getComponentType();
}
if (clazz.isPrimitive())
{
return true;
}
while (clazz != null && (clazz.isAnonymousClass() || clazz.isLocalClass()))
{
clazz = clazz.getEnclosingClass();
}
if (clazz == null || clazz.getCanonicalName() == null)
{
return false;
}
String className = clazz.getCanonicalName();
for (String blackListedClassHierarchy : _blackListedClassHierarchies)
{
if ("*".equals(blackListedClassHierarchy))
{
return false;
}
else if (className != null && (className.equals(blackListedClassHierarchy) || className.startsWith(blackListedClassHierarchy + ".")))
{
return false;
}
}
for (String whiteListedClassHierarchy : _whiteListedClassHierarchies)
{
if ("*".equals(whiteListedClassHierarchy))
{
return true;
}
else if (className != null && (className.equals(whiteListedClassHierarchy) || className.startsWith(whiteListedClassHierarchy + ".")))
{
return true;
}
}
return false;
}
boolean isVirtualHostPropertiesSupported()
{
return getDelegate().isVirtualHostPropertiesSupported();
}
}