| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.client; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.lang.reflect.InvocationTargetException; |
| import java.net.InetAddress; |
| import java.net.SocketAddress; |
| import java.net.UnknownHostException; |
| import java.nio.channels.UnresolvedAddressException; |
| import java.security.GeneralSecurityException; |
| import java.security.KeyStore; |
| import java.security.cert.CertificateFactory; |
| import java.security.cert.X509Certificate; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import javax.jms.ConnectionConsumer; |
| import javax.jms.ConnectionMetaData; |
| import javax.jms.Destination; |
| import javax.jms.ExceptionListener; |
| import javax.jms.IllegalStateException; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageEOFException; |
| import javax.jms.Queue; |
| import javax.jms.QueueSession; |
| import javax.jms.ServerSessionPool; |
| import javax.jms.StreamMessage; |
| import javax.jms.Topic; |
| import javax.jms.TopicSession; |
| import javax.naming.NamingException; |
| import javax.naming.Reference; |
| import javax.naming.Referenceable; |
| import javax.naming.StringRefAddr; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.AMQConnectionFailureException; |
| import org.apache.qpid.AMQDisconnectedException; |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.AMQProtocolException; |
| import org.apache.qpid.AMQUnresolvedAddressException; |
| import org.apache.qpid.QpidException; |
| import org.apache.qpid.client.failover.ConnectionRedirectException; |
| import org.apache.qpid.client.failover.FailoverException; |
| import org.apache.qpid.client.failover.FailoverProtectedOperation; |
| import org.apache.qpid.client.security.CallbackHandlerRegistry; |
| import org.apache.qpid.client.state.AMQState; |
| import org.apache.qpid.client.state.AMQStateManager; |
| import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream; |
| import org.apache.qpid.client.util.JMSExceptionHelper; |
| import org.apache.qpid.configuration.ClientProperties; |
| import org.apache.qpid.configuration.CommonProperties; |
| import org.apache.qpid.exchange.ExchangeDefaults; |
| import org.apache.qpid.framing.ProtocolVersion; |
| import org.apache.qpid.jms.ConnectionListener; |
| import org.apache.qpid.jms.ConnectionURL; |
| import org.apache.qpid.jms.FailoverPolicy; |
| import org.apache.qpid.jms.Session; |
| import org.apache.qpid.jndi.ObjectFactory; |
| import org.apache.qpid.protocol.ErrorCodes; |
| import org.apache.qpid.transport.ConnectionSettings; |
| import org.apache.qpid.url.URLSyntaxException; |
| |
| public class AMQConnection extends Closeable implements CommonConnection, Referenceable, |
| ClassLoadingAwareObjectInputStream.TrustedClassFilter |
| { |
| public static final String JNDI_ADDRESS_CONNECTION_URL = "connectionURL"; |
| |
| private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); |
| |
| static |
| { |
| ClientProperties.ensureIsLoaded(); |
| } |
| |
| private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong(); |
| private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT, |
| ClientProperties.DEFAULT_CLOSE_TIMEOUT); |
| |
| private final long _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet(); |
| |
| private final List<String> _whiteListedClassHierarchies; |
| private final List<String> _blackListedClassHierarchies; |
| |
| /** |
| * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be |
| * held by any child objects of this connection such as the session, producers and consumers. |
| */ |
| private final Object _failoverMutex = new Object(); |
| |
| private final Object _sessionCreationLock = new Object(); |
| |
| /** |
| * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session |
| * and we must prevent the client from opening too many. |
| */ |
| private long _maximumChannelCount; |
| |
| /** The maximum size of frame supported by the server */ |
| private long _maximumFrameSize; |
| |
| /** |
| * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped |
| * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate |
| * handler. |
| */ |
| private final AMQProtocolHandler _protocolHandler; |
| |
| /** Maps from session id (Integer) to AMQSession instance */ |
| private final ChannelToSessionMap _sessions = new ChannelToSessionMap(); |
| |
| private String _clientName; |
| |
| /** The user name to use for authentication */ |
| private String _username; |
| |
| /** The password to use for authentication */ |
| private String _password; |
| |
| /** The virtual path to connect to on the AMQ server */ |
| private String _virtualHost; |
| |
| /** The exception listener for this connection object. */ |
| private volatile ExceptionListener _exceptionListener; |
| |
| private ConnectionListener _connectionListener; |
| |
| private final ConnectionURL _connectionURL; |
| |
| /** |
| * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message |
| * publication. |
| */ |
| private volatile boolean _started; |
| |
| /** Policy dictating how to failover */ |
| private FailoverPolicy _failoverPolicy; |
| |
| /* |
| * _Connected should be refactored with a suitable wait object. |
| */ |
| private boolean _connected; |
| |
| private boolean _connectionAttempted; |
| |
| /* |
| * The connection meta data |
| */ |
| private QpidConnectionMetaData _connectionMetaData; |
| |
| private String _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; |
| private String _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; |
| private String _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; |
| private String _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; |
| |
| /** |
| * Thread Pool for executing connection level processes such as reporting asynchronous exceptions |
| * and for 0-8..0-91 returning bounced messages. |
| */ |
| private final ScheduledExecutorService _taskPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() |
| { |
| @Override |
| public Thread newThread(final Runnable r) |
| { |
| final String name = "Connection_" + AMQConnection.this._connectionNumber + "_task"; |
| _logger.debug("Creating connection pooled thread '{}'", name); |
| Thread thread = new Thread(r, name); |
| if (!thread.isDaemon()) |
| { |
| thread.setDaemon(true); |
| } |
| |
| return thread; |
| } |
| }); |
| |
| private AMQConnectionDelegate _delegate; |
| |
| // this connection maximum number of prefetched messages |
| private int _maxPrefetch; |
| |
| //Indicates whether persistent messages are synchronized |
| private boolean _syncPersistence; |
| |
| //Indicates whether we need to sync on every message ack |
| private boolean _syncAck; |
| |
| //Indicates whether we need to sync on every message ack on a client ack session, default to true |
| private final boolean _syncClientAck; |
| |
| //Indicates the sync publish options (persistent|all) |
| //By default it's async publish |
| private String _syncPublish = ""; |
| |
| //Indicates whether user-id should be attached to every sent message |
| //By default the user ID is attached |
| private boolean _populateUserId = true; |
| |
| // Indicates whether to use the old map message format or the |
| // new amqp-0-10 encoded format. |
| private boolean _useLegacyMapMessageFormat; |
| |
| // Indicates whether to use the old stream message format or the |
| // new amqp-0-10 list encoded format. |
| private boolean _useLegacyStreamMessageFormat; |
| |
| // When sending to a Queue destination for the first time, check that the queue is bound |
| private final boolean _validateQueueOnSend; |
| |
| //used to track the last failover time for |
| //Address resolution purposes |
| private volatile long _lastFailoverTime = 0; |
| |
| private boolean _compressMessages; |
| private int _messageCompressionThresholdSize; |
| |
| private final Map<String, String> _virtualHostProperties = new HashMap<>(); |
| private volatile boolean _virtualHostPropertiesPopulated; |
| |
| static |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Qpid version : " + CommonProperties.getVersionString()); |
| } |
| |
| // The registering of any additional SASL mechanisms with the Java Security API requires |
| // SecurityManager permissions. In execution environments such as web containers, |
| // this may require adjustments to the Java security.policy. |
| CallbackHandlerRegistry registry = CallbackHandlerRegistry.getInstance(); |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Loaded mechanisms " + registry.getMechanisms()); |
| } |
| } |
| |
| private ConnectionSettings _connectionSettings; |
| private final ConcurrentMap<String, KeyStore> _brokerTrustStores = new ConcurrentHashMap<>(); |
| private Session _brokerTrustStoreSession; |
| |
| /** |
| * @param broker brokerdetails |
| * @param username username |
| * @param password password |
| * @param clientName clientid |
| * @param virtualHost virtualhost |
| * |
| * @throws QpidException |
| * @throws URLSyntaxException |
| */ |
| public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) |
| throws QpidException, URLSyntaxException |
| { |
| this(new AMQConnectionURL( |
| ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" |
| + ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='" |
| + BrokerDetails.checkTransport(broker) + "'")); |
| } |
| |
| public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) |
| throws QpidException, URLSyntaxException |
| { |
| this(new AMQConnectionURL( |
| ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" |
| + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'")); |
| } |
| |
| public AMQConnection(String connection) throws QpidException, URLSyntaxException |
| { |
| this(new AMQConnectionURL(connection)); |
| } |
| |
| public AMQConnection(ConnectionURL connectionURL) throws QpidException |
| { |
| boolean success = false; |
| try |
| { |
| if (connectionURL == null) |
| { |
| throw new IllegalArgumentException("Connection must be specified"); |
| } |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Connection(" + _connectionNumber + "):" + connectionURL); |
| } |
| |
| // set this connection maxPrefetch |
| if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null) |
| { |
| _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH)); |
| } |
| else |
| { |
| // use the default value set for all connections |
| _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, |
| ClientProperties.MAX_PREFETCH_DEFAULT)); |
| } |
| |
| if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null) |
| { |
| _syncPersistence = |
| Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE)); |
| _logger.warn("sync_persistence is a deprecated property, " + |
| "please use sync_publish={persistent|all} instead"); |
| } |
| else |
| { |
| // use the default value set for all connections |
| _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); |
| if (_syncPersistence) |
| { |
| _logger.warn("sync_persistence is a deprecated property, " + |
| "please use sync_publish={persistent|all} instead"); |
| } |
| } |
| |
| if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null) |
| { |
| _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK)); |
| } |
| else |
| { |
| // use the default value set for all connections |
| _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME); |
| } |
| |
| if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_CLIENT_ACK) != null) |
| { |
| _syncClientAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_CLIENT_ACK)); |
| } |
| else |
| { |
| String legacyProperty = System.getProperty("qpid.sync_after_client.ack"); |
| if (legacyProperty != null) |
| { |
| _logger.warn("'qpid.sync_after_client.ack' is a deprecated system property, " + |
| "please use '{}' instead", ClientProperties.SYNC_CLIENT_ACK); |
| } |
| _syncClientAck = Boolean.parseBoolean(System.getProperty(ClientProperties.SYNC_CLIENT_ACK, |
| legacyProperty != null ? legacyProperty : "true")); |
| } |
| |
| if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null) |
| { |
| _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH); |
| } |
| else |
| { |
| // use the default value set for all connections |
| _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish); |
| } |
| |
| if (connectionURL.getOption(ConnectionURL.OPTIONS_POPULATE_USER_ID) != null) |
| { |
| _populateUserId = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_POPULATE_USER_ID)); |
| } |
| |
| if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null) |
| { |
| _useLegacyMapMessageFormat = Boolean.parseBoolean( |
| connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT)); |
| } |
| else |
| { |
| // use the default value set for all connections |
| _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT); |
| } |
| |
| if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT) != null) |
| { |
| _useLegacyStreamMessageFormat = Boolean.parseBoolean( |
| connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT)); |
| } |
| else |
| { |
| // use the default value set for all connections |
| _useLegacyStreamMessageFormat = System.getProperty(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT) == null ? |
| true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT); |
| } |
| |
| if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null) |
| { |
| _validateQueueOnSend = Boolean.parseBoolean( |
| connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND)); |
| } |
| else |
| { |
| _validateQueueOnSend = |
| Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false")); |
| } |
| |
| if(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES) != null) |
| { |
| _compressMessages = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_COMPRESS_MESSAGES)); |
| } |
| else |
| { |
| _compressMessages = |
| Boolean.parseBoolean(System.getProperty(ClientProperties.CONNECTION_OPTION_COMPRESS_MESSAGES, |
| String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES))); |
| } |
| |
| if(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE) != null) |
| { |
| _messageCompressionThresholdSize = Integer.valueOf(connectionURL.getOption(ConnectionURL.OPTIONS_MESSAGES_COMPRESSION_THRESHOLD_SIZE)); |
| } |
| else |
| { |
| _messageCompressionThresholdSize = Integer.getInteger(ClientProperties.CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE, |
| ClientProperties.DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE); |
| } |
| if(_messageCompressionThresholdSize <= 0) |
| { |
| _messageCompressionThresholdSize = Integer.MAX_VALUE; |
| } |
| |
| String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("AMQP version " + amqpVersion); |
| } |
| |
| _failoverPolicy = new FailoverPolicy(connectionURL, this); |
| if ("0-8".equals(amqpVersion)) |
| { |
| _delegate = new AMQConnectionDelegate_8_0(this); |
| } |
| else if ("0-9".equals(amqpVersion)) |
| { |
| _delegate = new AMQConnectionDelegate_0_9(this); |
| } |
| else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion)) |
| { |
| _delegate = new AMQConnectionDelegate_0_91(this); |
| } |
| else |
| { |
| _delegate = new AMQConnectionDelegate_0_10(this); |
| } |
| |
| _connectionURL = connectionURL; |
| |
| _clientName = connectionURL.getClientName(); |
| _username = connectionURL.getUsername(); |
| _password = connectionURL.getPassword(); |
| |
| setVirtualHost(connectionURL.getVirtualHost()); |
| |
| if (connectionURL.getDefaultQueueExchangeName() != null) |
| { |
| _defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName(); |
| } |
| |
| if (connectionURL.getDefaultTopicExchangeName() != null) |
| { |
| _defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName(); |
| } |
| |
| if (connectionURL.getTemporaryQueueExchangeName() != null) |
| { |
| _temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName(); |
| } |
| |
| if (connectionURL.getTemporaryTopicExchangeName() != null) |
| { |
| _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); |
| } |
| |
| _protocolHandler = new AMQProtocolHandler(this); |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); |
| } |
| |
| // We are not currently connected |
| setConnected(false); |
| |
| if (_clientName != null) |
| { |
| makeConnection(); |
| } |
| |
| _connectionMetaData = new QpidConnectionMetaData(); |
| |
| if (connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_WHITE_LIST) != null) |
| { |
| String whiteListedClassHierarchiesString = connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_WHITE_LIST); |
| _whiteListedClassHierarchies = Arrays.asList(whiteListedClassHierarchiesString.split(",")); |
| } |
| else |
| { |
| final String defaultWhiteListedClassHierarchiesString = System.getProperty(CommonProperties.QPID_SECURITY_OBJECT_MESSAGE_CLASS_HIERARCHY_WHITE_LIST, "*"); |
| _whiteListedClassHierarchies = Arrays.asList(defaultWhiteListedClassHierarchiesString.split(",")); |
| } |
| |
| if (connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_BLACK_LIST) != null) |
| { |
| String blackListedClassHierarchiesString = connectionURL.getOption(ConnectionURL.OPTIONS_OBJECT_MESSAGE_CLASS_HIERARCHY_BLACK_LIST); |
| _blackListedClassHierarchies = Arrays.asList(blackListedClassHierarchiesString.split(",")); |
| } |
| else |
| { |
| final String defaultBlackListedClassHierarchiesString = System.getProperty(CommonProperties.QPID_SECURITY_OBJECT_MESSAGE_CLASS_HIERARCHY_BLACK_LIST, ""); |
| _blackListedClassHierarchies = Arrays.asList(defaultBlackListedClassHierarchiesString.split(",")); |
| } |
| success = true; |
| } |
| finally |
| { |
| if (!success) |
| { |
| shutdownTaskPool(); |
| } |
| } |
| } |
| |
| private void makeConnection() throws QpidException |
| { |
| _connectionAttempted = true; |
| if(_clientName == null) |
| { |
| |
| try |
| { |
| InetAddress addr = InetAddress.getLocalHost(); |
| _clientName = addr.getHostName() + System.currentTimeMillis(); |
| } |
| catch (UnknownHostException e) |
| { |
| _clientName = "UnknownHost" + UUID.randomUUID(); |
| } |
| } |
| BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); |
| boolean retryAllowed = true; |
| Exception connectionException = null; |
| while (!isConnected() && retryAllowed && brokerDetails != null) |
| { |
| ProtocolVersion pe = null; |
| try |
| { |
| pe = makeBrokerConnection(brokerDetails); |
| } |
| catch (Exception e) |
| { |
| if (_logger.isInfoEnabled()) |
| { |
| _logger.info("Unable to connect to broker at " + |
| _failoverPolicy.getCurrentBrokerDetails(), |
| e); |
| } |
| connectionException = e; |
| } |
| |
| if (pe != null) |
| { |
| // reset the delegate to the version returned by the |
| // broker |
| initDelegate(pe); |
| } |
| else if (!isConnected()) |
| { |
| if(connectionException instanceof ConnectionRedirectException) |
| { |
| ConnectionRedirectException redirect = (ConnectionRedirectException) connectionException; |
| retryAllowed = true; |
| brokerDetails = new BrokerDetails(brokerDetails); |
| brokerDetails.setHost(redirect.getHost()); |
| brokerDetails.setPort(redirect.getPort()); |
| _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); |
| |
| } |
| else |
| { |
| retryAllowed = _failoverPolicy.failoverAllowed(); |
| brokerDetails = _failoverPolicy.getNextBrokerDetails(); |
| _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); |
| } |
| |
| } |
| } |
| verifyClientID(); |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Are we connected:" + isConnected()); |
| } |
| |
| if (!isConnected()) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); |
| } |
| |
| String message = null; |
| |
| if (connectionException != null) |
| { |
| if (connectionException.getCause() != null) |
| { |
| message = connectionException.getCause().getMessage(); |
| } |
| else |
| { |
| message = connectionException.getMessage(); |
| } |
| } |
| |
| if (message == null) |
| { |
| message = "Unable to Connect"; |
| } |
| else if("".equals(message)) |
| { |
| message = "Unable to Connect:" + connectionException.getClass(); |
| } |
| |
| for (Throwable th = connectionException; th != null; th = th.getCause()) |
| { |
| if (th instanceof UnresolvedAddressException || |
| th instanceof UnknownHostException) |
| { |
| throw new AMQUnresolvedAddressException |
| (message, |
| _failoverPolicy.getCurrentBrokerDetails().toString(), |
| connectionException); |
| } |
| } |
| |
| throw new AMQConnectionFailureException(message, connectionException); |
| } |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); |
| } |
| |
| _sessions.setMaxChannelID(_delegate.getMaxChannelID()); |
| _sessions.setMinChannelID(_delegate.getMinChannelID()); |
| } |
| |
| private void initDelegate(ProtocolVersion pe) throws AMQProtocolException |
| { |
| try |
| { |
| String delegateClassName = String.format |
| ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s", |
| pe.getMajorVersion(), pe.getMinorVersion()); |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe); |
| } |
| Class c = Class.forName(delegateClassName); |
| Class partypes[] = new Class[1]; |
| partypes[0] = AMQConnection.class; |
| _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); |
| |
| if (!ProtocolVersion.v0_10.equals(_delegate.getProtocolVersion())) |
| { |
| _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion()); |
| } |
| |
| // reset state waiter state |
| _protocolHandler.getStateManager().clearLastException(); |
| _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_NOT_STARTED); |
| } |
| catch (ClassNotFoundException e) |
| { |
| String errorMessage = String.format("Protocol: %s.%s is required by the broker but is not " + |
| "currently supported by this client library implementation", |
| pe.getMajorVersion(), pe.getMinorVersion()); |
| |
| throw new AMQProtocolException(errorMessage, e); |
| } |
| catch (NoSuchMethodException e) |
| { |
| throw new RuntimeException("unable to locate constructor for delegate", e); |
| } |
| catch (InstantiationException e) |
| { |
| throw new RuntimeException("error instantiating delegate", e); |
| } |
| catch (IllegalAccessException e) |
| { |
| throw new RuntimeException("error accessing delegate", e); |
| } |
| catch (InvocationTargetException e) |
| { |
| throw new RuntimeException("error invoking delegate", e); |
| } |
| } |
| |
| private void setVirtualHost(String virtualHost) |
| { |
| if (virtualHost != null && virtualHost.startsWith("/")) |
| { |
| virtualHost = virtualHost.substring(1); |
| } |
| |
| _virtualHost = virtualHost; |
| } |
| |
| public boolean attemptReconnection(String host, int port, final boolean useFailoverConfigOnFailure) |
| { |
| BrokerDetails bd = new BrokerDetails(_failoverPolicy.getCurrentBrokerDetails()); |
| bd.setHost(host); |
| bd.setPort(port); |
| |
| _failoverPolicy.setBroker(bd); |
| |
| try |
| { |
| makeBrokerConnection(bd); |
| |
| return true; |
| } |
| catch (Exception e) |
| { |
| if (_logger.isInfoEnabled()) |
| { |
| _logger.info("Unable to connect to broker at " + bd); |
| } |
| |
| return useFailoverConfigOnFailure && attemptReconnection(); |
| } |
| |
| } |
| |
| public boolean attemptReconnection() |
| { |
| BrokerDetails broker; |
| while (!isClosed() && !isClosing() && _failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null) |
| { |
| if (attemptConnection(broker)) |
| { |
| return true; |
| } |
| } |
| |
| // connection unsuccessful |
| return false; |
| } |
| |
| private boolean attemptConnection(final BrokerDetails broker) |
| { |
| try |
| { |
| makeBrokerConnection(broker); |
| return true; |
| } |
| catch (Exception e) |
| { |
| if (!(e instanceof QpidException)) |
| { |
| if (_logger.isInfoEnabled()) |
| { |
| _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); |
| } |
| } |
| else |
| { |
| if (_logger.isInfoEnabled()) |
| { |
| _logger.info(e.getMessage() + ":Unable to connect to broker at " |
| + _failoverPolicy.getCurrentBrokerDetails()); |
| } |
| } |
| } |
| return false; |
| } |
| |
| public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, QpidException |
| { |
| return _delegate.makeBrokerConnection(brokerDetail); |
| } |
| |
| public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E |
| { |
| return _delegate.executeRetrySupport(operation); |
| } |
| |
| /** |
| * Get the details of the currently active broker |
| * |
| * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance |
| * otherwise |
| */ |
| public BrokerDetails getActiveBrokerDetails() |
| { |
| return _failoverPolicy.getCurrentBrokerDetails(); |
| } |
| |
| public boolean failoverAllowed() |
| { |
| if (!isConnected()) |
| { |
| return false; |
| } |
| else |
| { |
| return _failoverPolicy.failoverAllowed(); |
| } |
| } |
| |
| public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException |
| { |
| return createSession(transacted, acknowledgeMode, _maxPrefetch); |
| } |
| |
| public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) |
| throws JMSException |
| { |
| return createSession(transacted, acknowledgeMode, prefetch, prefetch); |
| } |
| |
| public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, |
| final int prefetchHigh, final int prefetchLow) throws JMSException |
| { |
| synchronized (_sessionCreationLock) |
| { |
| |
| checkNotClosed(); |
| |
| if(!_connectionAttempted) |
| { |
| try |
| { |
| makeConnection(); |
| } |
| catch (QpidException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException("Unable to establish connection"),e); |
| } |
| } |
| |
| if(_delegate.isVirtualHostPropertiesSupported() && !_virtualHostPropertiesPopulated) |
| { |
| retrieveVirtualHostPropertiesIfNecessary(); |
| } |
| return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow); |
| } |
| } |
| |
| private void retrieveVirtualHostPropertiesIfNecessary() throws JMSException |
| { |
| synchronized (_virtualHostProperties) |
| { |
| if(!_virtualHostPropertiesPopulated) |
| { |
| final Session session = _delegate.createSession(false, AMQSession.NO_ACKNOWLEDGE, 3,3); |
| final MessageConsumer consumer = session.createConsumer(session.createQueue( |
| "ADDR: $virtualhostProperties; {assert: never, create: never, node:{ type: queue }}")); |
| try |
| { |
| ((AMQSession)session).start(); |
| } |
| catch (QpidException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException( |
| "Failed to retrieve virtual host properties"), e); |
| } |
| Message propertiesMessage = consumer.receive(getProtocolHandler().getDefaultTimeout()); |
| if(propertiesMessage != null) |
| { |
| for(String property : Collections.list((Enumeration<String>) propertiesMessage.getPropertyNames())) |
| { |
| _virtualHostProperties.put(property, propertiesMessage.getStringProperty(property)); |
| } |
| } |
| session.close(); |
| _virtualHostPropertiesPopulated = true; |
| } |
| } |
| } |
| |
| public KeyStore getBrokerSuppliedTrustStore(final String name) throws JMSException |
| { |
| synchronized(_brokerTrustStores) |
| { |
| if(!_brokerTrustStores.containsKey(name)) |
| { |
| if(_brokerTrustStoreSession == null) |
| { |
| _brokerTrustStoreSession = _delegate.createSession(false, AMQSession.AUTO_ACKNOWLEDGE, 1, 1); |
| try |
| { |
| ((AMQSession) _brokerTrustStoreSession).start(); |
| } |
| catch (QpidException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException( |
| "Failed to retrieve virtual host properties"), e); |
| } |
| } |
| final MessageConsumer consumer = _brokerTrustStoreSession.createConsumer(_brokerTrustStoreSession.createQueue( |
| "ADDR: " + name + "; {assert: never, create: never, node:{ type: queue }}")); |
| final Message message = consumer.receive(2000l); |
| if(message != null) |
| { |
| StreamMessage streamMessage = (StreamMessage) message; |
| List<X509Certificate> certs = new ArrayList<>(); |
| try |
| { |
| try |
| { |
| |
| final CertificateFactory certFactory = CertificateFactory.getInstance("X.509"); |
| byte[] bytes; |
| while ((bytes = (byte[]) streamMessage.readObject()) != null) |
| { |
| certs.add((X509Certificate) certFactory.generateCertificate(new ByteArrayInputStream( |
| bytes))); |
| } |
| } |
| catch (MessageEOFException e) |
| { |
| // end of message |
| } |
| KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); |
| |
| char[] encryptionTrustStorePassword = |
| getConnectionSettings().getEncryptionTrustStorePassword() == null |
| ? null |
| : getConnectionSettings().getEncryptionTrustStorePassword().toCharArray(); |
| |
| keyStore.load(null, encryptionTrustStorePassword); |
| int i = 1; |
| for (X509Certificate cert : certs) |
| { |
| keyStore.setCertificateEntry(String.valueOf(i++), cert); |
| } |
| _brokerTrustStores.put(name, keyStore); |
| } |
| catch (JMSException | GeneralSecurityException | IOException e) |
| { |
| _logger.error(e.getMessage(), e); |
| } |
| } |
| |
| } |
| return _brokerTrustStores.get(name); |
| |
| } |
| } |
| |
| public void setFailoverPolicy(FailoverPolicy policy) |
| { |
| _failoverPolicy = policy; |
| } |
| |
| public FailoverPolicy getFailoverPolicy() |
| { |
| return _failoverPolicy; |
| } |
| |
| /** |
| * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in |
| * the JMS spec |
| * |
| * @param transacted |
| * @param acknowledgeMode |
| * |
| * @return QueueSession |
| * |
| * @throws JMSException |
| */ |
| public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException |
| { |
| return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode)); |
| } |
| |
| /** |
| * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in |
| * the JMS spec |
| * |
| * @param transacted |
| * @param acknowledgeMode |
| * |
| * @return TopicSession |
| * |
| * @throws JMSException |
| */ |
| public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException |
| { |
| return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode)); |
| } |
| |
| public boolean channelLimitReached() |
| { |
| return _sessions.size() >= _maximumChannelCount; |
| } |
| |
| public String getClientID() throws JMSException |
| { |
| checkNotClosed(); |
| |
| return _clientName; |
| } |
| |
| public void setClientID(String clientID) throws JMSException |
| { |
| checkNotClosed(); |
| synchronized(_sessionCreationLock) |
| { |
| if(_connectionAttempted) |
| { |
| // in AMQP it is not possible to change the client ID. If one is not specified |
| // upon connection construction, an id is generated automatically. Therefore |
| // we can always throw an exception. |
| if (!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME)) |
| { |
| throw new IllegalStateException("Client name cannot be changed after being set"); |
| } |
| else |
| { |
| _logger.info("Operation setClientID is ignored using ID: " + getClientID()); |
| } |
| } |
| else |
| { |
| _clientName = clientID; |
| } |
| } |
| } |
| |
| public ConnectionMetaData getMetaData() throws JMSException |
| { |
| checkNotClosed(); |
| |
| return _connectionMetaData; |
| |
| } |
| |
| protected final ExceptionListener getExceptionListenerNoCheck() |
| { |
| return _exceptionListener; |
| } |
| |
| public ExceptionListener getExceptionListener() throws JMSException |
| { |
| checkNotClosed(); |
| return getExceptionListenerNoCheck(); |
| } |
| |
| public void setExceptionListener(ExceptionListener listener) throws JMSException |
| { |
| checkNotClosed(); |
| |
| _exceptionListener = listener; |
| } |
| |
| /** |
| * Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread |
| * and is not thread safe (which is legal according to the JMS specification). |
| * |
| * @throws JMSException |
| */ |
| public void start() throws JMSException |
| { |
| checkNotClosed(); |
| if (!_started) |
| { |
| _started = true; |
| final Iterator it = _sessions.values().iterator(); |
| while (it.hasNext()) |
| { |
| final AMQSession s = (AMQSession) (it.next()); |
| try |
| { |
| s.start(); |
| } |
| catch (QpidException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException("Connection.start failed"), e); |
| } |
| } |
| |
| } |
| } |
| |
| public void stop() throws JMSException |
| { |
| checkNotClosed(); |
| if (_started) |
| { |
| for (Iterator i = _sessions.values().iterator(); i.hasNext();) |
| { |
| try |
| { |
| ((AMQSession) i.next()).stop(); |
| } |
| catch (QpidException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException("Connection.stop failed."), e); |
| } |
| } |
| |
| _started = false; |
| } |
| } |
| |
| public void close() throws JMSException |
| { |
| close(DEFAULT_CLOSE_TIMEOUT); |
| } |
| |
| private void close(long timeout) throws JMSException |
| { |
| boolean closed; |
| |
| synchronized (_sessionCreationLock) |
| { |
| closed = setClosed(); |
| } |
| |
| try |
| { |
| if (!closed) |
| { |
| List<AMQSession> sessions = new ArrayList<>(_sessions.values()); |
| |
| setClosing(true); |
| try |
| { |
| doClose(sessions, timeout); |
| } |
| finally |
| { |
| setClosing(false); |
| } |
| } |
| } |
| finally |
| { |
| shutdownTaskPool(); |
| } |
| } |
| |
| private void doClose(List<AMQSession> sessions, long timeout) throws JMSException |
| { |
| if (!sessions.isEmpty()) |
| { |
| AMQSession session = sessions.remove(0); |
| session.lockMessageDelivery(); |
| try |
| { |
| doClose(sessions, timeout); |
| } |
| finally |
| { |
| session.unlockMessageDelivery(); |
| } |
| } |
| else |
| { |
| synchronized (getFailoverMutex()) |
| { |
| try |
| { |
| closeAllSessions(null, timeout); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("Error closing connection", e); |
| throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing connection: " + e), e); |
| } |
| finally |
| { |
| try |
| { |
| _delegate.closeConnection(timeout); |
| } |
| catch (Exception e) |
| { |
| _logger.warn("Error closing underlying protocol connection", e); |
| } |
| } |
| } |
| } |
| } |
| |
| private void shutdownTaskPool() |
| { |
| _taskPool.shutdown(); |
| } |
| |
| /** |
| * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to |
| * mark objects "visible" in userland as closed after failover or other significant event that impacts the |
| * connection. <p/> The caller must hold the failover mutex before calling this method. |
| */ |
| private void markAllSessionsClosed() |
| { |
| final LinkedList sessionCopy = new LinkedList(_sessions.values()); |
| final Iterator it = sessionCopy.iterator(); |
| while (it.hasNext()) |
| { |
| final AMQSession session = (AMQSession) it.next(); |
| |
| session.markClosed(); |
| } |
| |
| _sessions.clear(); |
| } |
| |
| /** |
| * Close all the sessions, either due to normal connection closure or due to an error occurring. |
| * |
| * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex |
| * before calling this method. |
| */ |
| private void closeAllSessions(Throwable cause, long timeout) throws JMSException |
| { |
| final LinkedList sessionCopy = new LinkedList(_sessions.values()); |
| final Iterator it = sessionCopy.iterator(); |
| JMSException sessionException = null; |
| while (it.hasNext()) |
| { |
| final AMQSession session = (AMQSession) it.next(); |
| if (cause != null) |
| { |
| session.closed(cause); |
| } |
| else |
| { |
| try |
| { |
| session.close(timeout); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("Error closing session: " + e); |
| sessionException = e; |
| } |
| } |
| } |
| |
| _sessions.clear(); |
| if (sessionException != null) |
| { |
| throw sessionException; |
| } |
| } |
| |
| public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, |
| ServerSessionPool sessionPool, int maxMessages) throws JMSException |
| { |
| checkNotClosed(); |
| |
| throw new JmsNotImplementedException(); |
| |
| } |
| |
| public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, |
| int maxMessages) throws JMSException |
| { |
| checkNotClosed(); |
| |
| throw new JmsNotImplementedException(); |
| } |
| |
| public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, |
| int maxMessages) throws JMSException |
| { |
| checkNotClosed(); |
| |
| throw new JmsNotImplementedException(); |
| } |
| |
| public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, |
| ServerSessionPool sessionPool, int maxMessages) throws JMSException |
| { |
| checkNotClosed(); |
| |
| throw new JmsNotImplementedException(); |
| } |
| |
| public long getMaximumChannelCount() throws JMSException |
| { |
| checkNotClosed(); |
| |
| return _maximumChannelCount; |
| } |
| |
| public void setConnectionListener(ConnectionListener listener) |
| { |
| _connectionListener = listener; |
| } |
| |
| public ConnectionListener getConnectionListener() |
| { |
| return _connectionListener; |
| } |
| |
| public void setMaximumChannelCount(long maximumChannelCount) |
| { |
| _maximumChannelCount = maximumChannelCount; |
| } |
| |
| public void setMaximumFrameSize(long frameMax) |
| { |
| _maximumFrameSize = frameMax; |
| } |
| |
| public long getMaximumFrameSize() |
| { |
| return _maximumFrameSize; |
| } |
| |
| public ChannelToSessionMap getSessions() |
| { |
| return _sessions; |
| } |
| |
| public String getUsername() |
| { |
| return _username; |
| } |
| |
| public void setUsername(String id) |
| { |
| _username = id; |
| } |
| |
| public String getPassword() |
| { |
| return _password; |
| } |
| |
| public String getVirtualHost() |
| { |
| return _virtualHost; |
| } |
| |
| public final AMQProtocolHandler getProtocolHandler() |
| { |
| return _protocolHandler; |
| } |
| |
| public final boolean started() |
| { |
| return _started; |
| } |
| |
| public final boolean isConnected() |
| { |
| return _connected; |
| } |
| |
| protected final void setConnected(boolean connected) |
| { |
| _connected = connected; |
| } |
| |
| public void bytesSent(long writtenBytes) |
| { |
| if (_connectionListener != null) |
| { |
| _connectionListener.bytesSent(writtenBytes); |
| } |
| } |
| |
| public void bytesReceived(long receivedBytes) |
| { |
| if (_connectionListener != null) |
| { |
| _connectionListener.bytesReceived(receivedBytes); |
| } |
| } |
| |
| /** |
| * Fire the preFailover event to the registered connection listener (if any) |
| * |
| * @param redirect true if this is the result of a redirect request rather than a connection error |
| * |
| * @return true if no listener or listener does not veto change |
| */ |
| public boolean firePreFailover(boolean redirect) |
| { |
| _lastFailoverTime = System.currentTimeMillis(); |
| boolean proceed = true; |
| if (_connectionListener != null) |
| { |
| proceed = _connectionListener.preFailover(redirect); |
| } |
| |
| return proceed; |
| } |
| |
| /** |
| * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes |
| * resubscription then all the sessions are closed. |
| * |
| * @return true if no listener or listener does not veto resubscription. |
| * |
| * @throws JMSException |
| */ |
| public boolean firePreResubscribe() throws JMSException |
| { |
| if (_connectionListener != null) |
| { |
| boolean resubscribe = _connectionListener.preResubscribe(); |
| if (!resubscribe) |
| { |
| markAllSessionsClosed(); |
| } |
| |
| return resubscribe; |
| } |
| else |
| { |
| return true; |
| } |
| } |
| |
| /** Fires a failover complete event to the registered connection listener (if any). */ |
| public void fireFailoverComplete() |
| { |
| if (_connectionListener != null) |
| { |
| _connectionListener.failoverComplete(); |
| } |
| } |
| |
| /** |
| * In order to protect the consistency of the connection and its child sessions, consumers and producers, the |
| * "failover mutex" must be held when doing any operations that could be corrupted during failover. |
| * |
| * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs. |
| */ |
| public final Object getFailoverMutex() |
| { |
| return _failoverMutex; |
| } |
| |
| public void resubscribeSessions() throws JMSException, QpidException, FailoverException |
| { |
| _delegate.resubscribeSessions(); |
| } |
| |
| /** |
| * If failover is taking place this will block until it has completed. If failover is not taking place it will |
| * return immediately. |
| * |
| * @throws InterruptedException |
| */ |
| public void blockUntilNotFailingOver() throws InterruptedException |
| { |
| _protocolHandler.blockUntilNotFailingOver(); |
| } |
| |
| /** |
| * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception |
| * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will |
| * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them. |
| * |
| * @param cause the exception |
| */ |
| public void exceptionReceived(Throwable cause) |
| { |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause); |
| } |
| |
| final JMSException je = convertToJMSException(cause); |
| |
| try |
| { |
| if (hardError(cause)) |
| { |
| closeSessions(cause); |
| } |
| } |
| finally |
| { |
| deliverJMSExceptionToExceptionListenerOrLog(je, cause); |
| } |
| } |
| |
| private JMSException convertToJMSException(Throwable cause) |
| { |
| final JMSException je; |
| if (cause instanceof JMSException) |
| { |
| je = (JMSException) cause; |
| } |
| else |
| { |
| int errorCode = 0; |
| |
| if (cause instanceof AMQException) |
| { |
| errorCode = ((AMQException) cause).getErrorCode(); |
| } |
| |
| if (errorCode != 0) |
| { |
| je = JMSExceptionHelper.chainJMSException(new JMSException("Exception thrown against " |
| + toString() |
| + ": " |
| + cause, Integer.toString(errorCode)), |
| cause); |
| } |
| else |
| { |
| //Should never get here as all AMQEs are required to have an ErrorCode! |
| // Other than AMQDisconnectedEx! |
| |
| if (cause instanceof AMQDisconnectedException) |
| { |
| Exception last = _protocolHandler.getStateManager().getLastException(); |
| if (last != null) |
| { |
| _logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception"); |
| cause = last; |
| } |
| } |
| je = JMSExceptionHelper.chainJMSException(new JMSException("Exception thrown against " |
| + toString() |
| + ": " |
| + cause), cause); |
| } |
| } |
| return je; |
| } |
| |
| void closed(Throwable cause) |
| { |
| _logger.debug("Closing closed connection {} ", this.toString()); |
| |
| final JMSException je = convertToJMSException(cause); |
| try |
| { |
| _protocolHandler.getProtocolSession().notifyError(je); |
| boolean performClose = !setClosed(); |
| |
| // if we are closing the connection, close sessions first |
| if (performClose) |
| { |
| closeSessions(cause); |
| } |
| } |
| finally |
| { |
| deliverJMSExceptionToExceptionListenerOrLog(je, cause); |
| } |
| } |
| |
| private void closeSessions(Throwable cause) |
| { |
| // get the failover mutex before trying to close |
| synchronized (getFailoverMutex()) |
| { |
| try |
| { |
| closeAllSessions(cause, -1); |
| } |
| catch (JMSException e) |
| { |
| _logger.error("Error closing all sessions: " + e, e); |
| } |
| } |
| } |
| |
| private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, final Throwable cause) |
| { |
| final ExceptionListener exceptionListener = getExceptionListenerNoCheck(); |
| if (exceptionListener != null) |
| { |
| performConnectionTask(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| // deliver the exception if there is a listener |
| try |
| { |
| exceptionListener.onException(je); |
| } |
| catch (RuntimeException e) |
| { |
| _logger.error("Exception occurred in ExceptionListener", e); |
| } |
| } |
| }); |
| } |
| else |
| { |
| _logger.error("Throwable Received but no listener set: " + cause); |
| } |
| |
| |
| } |
| |
| private boolean hardError(Throwable cause) |
| { |
| if (cause instanceof AMQException) |
| { |
| return ((AMQException) cause).isHardError(); |
| } |
| |
| return true; |
| } |
| |
| void registerSession(int channelId, AMQSession session) |
| { |
| _sessions.put(channelId, session); |
| } |
| |
| public void deregisterSession(int channelId) |
| { |
| _sessions.remove(channelId); |
| } |
| |
| public String toString() |
| { |
| StringBuffer buf = new StringBuffer("AMQConnection:\n"); |
| if (_failoverPolicy.getCurrentBrokerDetails() == null) |
| { |
| buf.append("No active broker connection"); |
| } |
| else |
| { |
| BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails(); |
| buf.append("Host: ").append(String.valueOf(bd.getHost())); |
| buf.append("\nPort: ").append(String.valueOf(bd.getPort())); |
| } |
| |
| buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost)); |
| buf.append("\nClient ID: ").append(String.valueOf(_clientName)); |
| buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size()); |
| |
| return buf.toString(); |
| } |
| |
| /** |
| * Returns connection url. |
| * @return connection url |
| */ |
| public ConnectionURL getConnectionURL() |
| { |
| return _connectionURL; |
| } |
| |
| /** |
| * Returns stringified connection url. This url is suitable only for display |
| * as {@link AMQConnectionURL#toString()} converts any password to asterisks. |
| * @return connection url |
| */ |
| public String toURL() |
| { |
| return _connectionURL.toString(); |
| } |
| |
| public Reference getReference() throws NamingException |
| { |
| return new Reference(AMQConnection.class.getName(), new StringRefAddr(JNDI_ADDRESS_CONNECTION_URL, toURL()), |
| ObjectFactory.class.getName(), null); // factory location |
| } |
| |
| public String getDefaultTopicExchangeName() |
| { |
| return _defaultTopicExchangeName; |
| } |
| |
| public void setDefaultTopicExchangeName(String defaultTopicExchangeName) |
| { |
| _defaultTopicExchangeName = defaultTopicExchangeName; |
| } |
| |
| public String getDefaultQueueExchangeName() |
| { |
| return _defaultQueueExchangeName; |
| } |
| |
| public void setDefaultQueueExchangeName(String defaultQueueExchangeName) |
| { |
| _defaultQueueExchangeName = defaultQueueExchangeName; |
| } |
| |
| public String getTemporaryTopicExchangeName() |
| { |
| return _temporaryTopicExchangeName; |
| } |
| |
| public String getTemporaryQueueExchangeName() |
| { |
| return _temporaryQueueExchangeName; |
| } |
| |
| public void setTemporaryTopicExchangeName(String temporaryTopicExchangeName) |
| { |
| _temporaryTopicExchangeName = temporaryTopicExchangeName; |
| } |
| |
| public void setTemporaryQueueExchangeName(String temporaryQueueExchangeName) |
| { |
| _temporaryQueueExchangeName = temporaryQueueExchangeName; |
| } |
| |
| public void performConnectionTask(Runnable task) |
| { |
| try |
| { |
| _taskPool.execute(task); |
| } |
| catch (RejectedExecutionException e) |
| { |
| if(!(isClosed() || isClosing())) |
| { |
| throw e; |
| } |
| } |
| } |
| |
| ScheduledFuture<?> scheduleTask(Runnable task, long initialDelay, long period, TimeUnit timeUnit) |
| { |
| return _taskPool.scheduleAtFixedRate(task, initialDelay, period, timeUnit); |
| } |
| |
| public AMQSession getSession(int channelId) |
| { |
| return _sessions.get(channelId); |
| } |
| |
| public ProtocolVersion getProtocolVersion() |
| { |
| return _delegate.getProtocolVersion(); |
| } |
| |
| public String getBrokerUUID() |
| { |
| if(getProtocolVersion().equals(ProtocolVersion.v0_10)) |
| { |
| return ((AMQConnectionDelegate_0_10)_delegate).getUUID(); |
| } |
| else |
| { |
| return null; |
| } |
| } |
| |
| /** |
| * Tests whether the Broker has advertised support for the named feature. |
| * |
| * @param featureName |
| * |
| * @return true if the feature is supported, or false otherwise. |
| */ |
| boolean isSupportedServerFeature(final String featureName) |
| { |
| return _delegate.isSupportedServerFeature(featureName); |
| } |
| |
| public boolean isFailingOver() |
| { |
| return (_protocolHandler.getFailoverLatch() != null); |
| } |
| |
| /** |
| * Get the maximum number of messages that this connection can pre-fetch. |
| * |
| * @return The maximum number of messages that this connection can pre-fetch. |
| */ |
| public long getMaxPrefetch() |
| { |
| return _maxPrefetch; |
| } |
| |
| /** |
| * Indicates whether persistent messages are synchronized |
| * |
| * @return true if persistent messages are synchronized false otherwise |
| */ |
| public boolean getSyncPersistence() |
| { |
| return _syncPersistence; |
| } |
| |
| /** |
| * Indicates whether we need to sync on every message ack |
| */ |
| public boolean getSyncAck() |
| { |
| return _syncAck; |
| } |
| |
| boolean getSyncClientAck() |
| { |
| return _syncClientAck; |
| } |
| |
| public String getSyncPublish() |
| { |
| return _syncPublish; |
| } |
| |
| public boolean isPopulateUserId() |
| { |
| return _populateUserId; |
| } |
| |
| public boolean isMessageCompressionDesired() |
| { |
| return _compressMessages; |
| } |
| |
| public int getNextChannelID() |
| { |
| return _sessions.getNextChannelId(); |
| } |
| |
| public boolean isUseLegacyMapMessageFormat() |
| { |
| return _useLegacyMapMessageFormat; |
| } |
| |
| public boolean isUseLegacyStreamMessageFormat() |
| { |
| return _useLegacyStreamMessageFormat; |
| } |
| |
| private void verifyClientID() throws QpidException |
| { |
| if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID)) |
| { |
| try |
| { |
| if (!_delegate.verifyClientID()) |
| { |
| throw new AMQException(ErrorCodes.ALREADY_EXISTS, "ClientID must be unique"); |
| } |
| } |
| catch(JMSException e) |
| { |
| throw new QpidException(e.getMessage(),e); |
| } |
| } |
| } |
| |
| public long getLastFailoverTime() |
| { |
| return _lastFailoverTime; |
| } |
| |
| protected AMQConnectionDelegate getDelegate() |
| { |
| return _delegate; |
| } |
| |
| public Long getConnectionNumber() |
| { |
| return _connectionNumber; |
| } |
| |
| protected void logConnected(SocketAddress localAddress, SocketAddress remoteAddress) |
| { |
| if(_logger.isInfoEnabled()) |
| { |
| _logger.info("Connection " + _connectionNumber + " now connected from " |
| + localAddress + " to " + remoteAddress); |
| } |
| } |
| |
| void setHeartbeatListener(HeartbeatListener listener) |
| { |
| _delegate.setHeartbeatListener(listener); |
| } |
| |
| public boolean validateQueueOnSend() |
| { |
| return _validateQueueOnSend; |
| } |
| |
| public int getMessageCompressionThresholdSize() |
| { |
| return _messageCompressionThresholdSize; |
| } |
| |
| void doWithAllLocks(Runnable r) |
| { |
| doWithAllLocks(r, _sessions.values()); |
| |
| } |
| |
| private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions) |
| { |
| if (!sessions.isEmpty()) |
| { |
| AMQSession session = sessions.remove(0); |
| |
| |
| Object dispatcherLock = session.getDispatcherLock(); |
| if (dispatcherLock == null) |
| { |
| dispatcherLock = new Object(); // use dummy intrinsic lock to make subsequent code nicer |
| } |
| synchronized (dispatcherLock) |
| { |
| session.lockMessageDelivery(); |
| try |
| { |
| doWithAllLocks(r, sessions); |
| } |
| finally |
| { |
| session.unlockMessageDelivery(); |
| } |
| } |
| } |
| else |
| { |
| synchronized (getFailoverMutex()) |
| { |
| r.run(); |
| } |
| } |
| } |
| |
| |
| public String getTemporaryQueuePrefix() |
| { |
| if(_delegate.isVirtualHostPropertiesSupported()) |
| { |
| final String prefix = getVirtualHostProperty("virtualHost.temporaryQueuePrefix"); |
| return prefix == null ? "" : prefix; |
| } |
| else |
| { |
| return ""; |
| } |
| |
| } |
| |
| String getVirtualHostProperty(final String propertyName) |
| { |
| return _virtualHostProperties.get(propertyName); |
| } |
| |
| public void setConnectionSettings(final ConnectionSettings connectionSettings) |
| { |
| _connectionSettings = connectionSettings; |
| } |
| |
| public ConnectionSettings getConnectionSettings() |
| { |
| return _connectionSettings; |
| } |
| |
| @Override |
| public boolean isTrusted(Class<?> clazz) |
| { |
| while (clazz.isArray()) |
| { |
| clazz = clazz.getComponentType(); |
| } |
| |
| if (clazz.isPrimitive()) |
| { |
| return true; |
| } |
| |
| while (clazz != null && (clazz.isAnonymousClass() || clazz.isLocalClass())) |
| { |
| clazz = clazz.getEnclosingClass(); |
| } |
| |
| if (clazz == null || clazz.getCanonicalName() == null) |
| { |
| return false; |
| } |
| |
| String className = clazz.getCanonicalName(); |
| |
| for (String blackListedClassHierarchy : _blackListedClassHierarchies) |
| { |
| if ("*".equals(blackListedClassHierarchy)) |
| { |
| return false; |
| } |
| else if (className != null && (className.equals(blackListedClassHierarchy) || className.startsWith(blackListedClassHierarchy + "."))) |
| { |
| return false; |
| } |
| } |
| |
| for (String whiteListedClassHierarchy : _whiteListedClassHierarchies) |
| { |
| if ("*".equals(whiteListedClassHierarchy)) |
| { |
| return true; |
| } |
| else if (className != null && (className.equals(whiteListedClassHierarchy) || className.startsWith(whiteListedClassHierarchy + "."))) |
| { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| boolean isVirtualHostPropertiesSupported() |
| { |
| return getDelegate().isVirtualHostPropertiesSupported(); |
| } |
| } |