blob: ee3e0767d4a8d221027519f67fa6369f0936fe26 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Connection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
public static final class ChannelToSessionMap
{
private final AMQSession[] _fastAccessSessions = new AMQSession[16];
private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
private int _size = 0;
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
private AtomicInteger _idFactory = new AtomicInteger(0);
private int _maxChannelID;
private boolean _cycledIds;
public AMQSession get(int channelId)
{
if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
return _fastAccessSessions[channelId];
}
else
{
return _slowAccessSessions.get(channelId);
}
}
public AMQSession put(int channelId, AMQSession session)
{
AMQSession oldVal;
if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
oldVal = _fastAccessSessions[channelId];
_fastAccessSessions[channelId] = session;
}
else
{
oldVal = _slowAccessSessions.put(channelId, session);
}
if ((oldVal != null) && (session == null))
{
_size--;
}
else if ((oldVal == null) && (session != null))
{
_size++;
}
return session;
}
public AMQSession remove(int channelId)
{
AMQSession session;
if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
session = _fastAccessSessions[channelId];
_fastAccessSessions[channelId] = null;
}
else
{
session = _slowAccessSessions.remove(channelId);
}
if (session != null)
{
_size--;
}
return session;
}
public Collection<AMQSession> values()
{
ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
for (int i = 0; i < 16; i++)
{
if (_fastAccessSessions[i] != null)
{
values.add(_fastAccessSessions[i]);
}
}
values.addAll(_slowAccessSessions.values());
return values;
}
public int size()
{
return _size;
}
public void clear()
{
_size = 0;
_slowAccessSessions.clear();
for (int i = 0; i < 16; i++)
{
_fastAccessSessions[i] = null;
}
}
/*
* Synchronized on whole method so that we don't need to consider the
* increment-then-reset path in too much detail
*/
public synchronized int getNextChannelId()
{
int id = 0;
if (!_cycledIds)
{
id = _idFactory.incrementAndGet();
if (id == _maxChannelID)
{
_cycledIds = true;
_idFactory.set(0); // Go back to the start
}
}
else
{
boolean done = false;
while (!done)
{
// Needs to work second time through
id = _idFactory.incrementAndGet();
if (id > _maxChannelID)
{
_idFactory.set(0);
id = _idFactory.incrementAndGet();
}
if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
{
done = (_fastAccessSessions[id] == null);
}
else
{
done = (!_slowAccessSessions.keySet().contains(id));
}
}
}
return id;
}
public void setMaxChannelID(int maxChannelID)
{
_maxChannelID = maxChannelID;
}
}
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
/**
* This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
* held by any child objects of this connection such as the session, producers and consumers.
*/
private final Object _failoverMutex = new Object();
private final Object _sessionCreationLock = new Object();
/**
* A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
* and we must prevent the client from opening too many. Zero means unlimited.
*/
protected long _maximumChannelCount;
/** The maximum size of frame supported by the server */
private long _maximumFrameSize;
/**
* The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
* the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
* handler.
*/
protected AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
private String _clientName;
/** The user name to use for authentication */
private String _username;
/** The password to use for authentication */
private String _password;
/** The virtual path to connect to on the AMQ server */
private String _virtualHost;
protected ExceptionListener _exceptionListener;
private ConnectionListener _connectionListener;
private ConnectionURL _connectionURL;
/**
* Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
* publication.
*/
protected volatile boolean _started;
/** Policy dictating how to failover */
protected FailoverPolicy _failoverPolicy;
/*
* _Connected should be refactored with a suitable wait object.
*/
protected boolean _connected;
/*
* The connection meta data
*/
private QpidConnectionMetaData _connectionMetaData;
/** Configuration info for SSL */
private SSLConfiguration _sslConfiguration;
private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
private static final long DEFAULT_TIMEOUT = 1000 * 30;
protected AMQConnectionDelegate _delegate;
// this connection maximum number of prefetched messages
private int _maxPrefetch;
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
//Indicates whether we need to sync on every message ack
private boolean _syncAck;
//Indicates the sync publish options (persistent|all)
//By default it's async publish
private String _syncPublish = "";
// Indicates whether to use the old map message format or the
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
/**
* @param broker brokerdetails
* @param username username
* @param password password
* @param clientName clientid
* @param virtualHost virtualhost
*
* @throws AMQException
* @throws URLSyntaxException
*/
public AMQConnection(String broker, String username, String password, String clientName, String virtualHost)
throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
+ AMQBrokerDetails.checkTransport(broker) + "'"), null);
}
/**
* @param broker brokerdetails
* @param username username
* @param password password
* @param clientName clientid
* @param virtualHost virtualhost
*
* @throws AMQException
* @throws URLSyntaxException
*/
public AMQConnection(String broker, String username, String password, String clientName, String virtualHost,
SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='"
+ AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
}
public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
throws AMQException, URLSyntaxException
{
this(host, port, false, username, password, clientName, virtualHost, null);
}
public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost,
SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
this(host, port, false, username, password, clientName, virtualHost, sslConfig);
}
public AMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName,
String virtualHost, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(
useSSL
? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'")
: (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost + "?brokerlist='tcp://" + host + ":" + port
+ "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig);
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(connection), null);
}
public AMQConnection(String connection, SSLConfiguration sslConfig) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(connection), sslConfig);
}
/**
* @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
* was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
*/
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
// set this connection maxPrefetch
if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
_maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
}
else
{
// use the defaul value set for all connections
_maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
ClientProperties.MAX_PREFETCH_DEFAULT));
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
{
_syncPersistence =
Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
_logger.warn("sync_persistence is a deprecated property, " +
"please use sync_publish={persistent|all} instead");
}
else
{
// use the defaul value set for all connections
_syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
if (_syncPersistence)
{
_logger.warn("sync_persistence is a deprecated property, " +
"please use sync_publish={persistent|all} instead");
}
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
{
_syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
}
else
{
// use the defaul value set for all connections
_syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
{
_syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
}
else
{
// use the default value set for all connections
_syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null)
{
_useLegacyMapMessageFormat = Boolean.parseBoolean(
connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT));
}
else
{
// use the default value set for all connections
_useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
}
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
_logger.debug("AMQP version " + amqpVersion);
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
if ("0-8".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
}
else if ("0-9".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_0_9(this);
}
else if ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_9_1(this);
}
else
{
_delegate = new AMQConnectionDelegate_0_10(this);
}
_sessions.setMaxChannelID(_delegate.getMaxChannelID());
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
}
_sslConfiguration = sslConfig;
if (connectionURL == null)
{
throw new IllegalArgumentException("Connection must be specified");
}
_connectionURL = connectionURL;
_clientName = connectionURL.getClientName();
_username = connectionURL.getUsername();
_password = connectionURL.getPassword();
setVirtualHost(connectionURL.getVirtualHost());
if (connectionURL.getDefaultQueueExchangeName() != null)
{
_defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
}
if (connectionURL.getDefaultTopicExchangeName() != null)
{
_defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName();
}
if (connectionURL.getTemporaryQueueExchangeName() != null)
{
_temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName();
}
if (connectionURL.getTemporaryTopicExchangeName() != null)
{
_temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
}
_protocolHandler = new AMQProtocolHandler(this);
_logger.info("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
// We are not currently connected
_connected = false;
boolean retryAllowed = true;
Exception connectionException = null;
while (!_connected && retryAllowed && brokerDetails != null)
{
ProtocolVersion pe = null;
try
{
pe = makeBrokerConnection(brokerDetails);
}
catch (Exception e)
{
if (_logger.isInfoEnabled())
{
_logger.info("Unable to connect to broker at " +
_failoverPolicy.getCurrentBrokerDetails(),
e);
}
connectionException = e;
}
if (pe != null)
{
// reset the delegate to the version returned by the
// broker
initDelegate(pe);
}
else if (!_connected)
{
retryAllowed = _failoverPolicy.failoverAllowed();
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
_logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
if (_logger.isDebugEnabled())
{
_logger.debug("Are we connected:" + _connected);
}
if (!_connected)
{
String message = null;
if (connectionException != null)
{
if (connectionException.getCause() != null)
{
message = connectionException.getCause().getMessage();
}
else
{
message = connectionException.getMessage();
}
}
if ((message == null) || message.equals(""))
{
if (message == null)
{
message = "Unable to Connect";
}
else // can only be "" if getMessage() returned it therfore lastException != null
{
message = "Unable to Connect:" + connectionException.getClass();
}
}
for (Throwable th = connectionException; th != null; th = th.getCause())
{
if (th instanceof UnresolvedAddressException ||
th instanceof UnknownHostException)
{
throw new AMQUnresolvedAddressException
(message,
_failoverPolicy.getCurrentBrokerDetails().toString(),
connectionException);
}
}
throw new AMQConnectionFailureException(message, connectionException);
}
_connectionMetaData = new QpidConnectionMetaData(this);
}
protected boolean checkException(Throwable thrown)
{
Throwable cause = thrown.getCause();
if (cause == null)
{
cause = thrown;
}
return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
{
try
{
String delegateClassName = String.format
("org.apache.qpid.client.AMQConnectionDelegate_%s_%s",
pe.getMajorVersion(), pe.getMinorVersion());
_logger.info("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe);
Class c = Class.forName(delegateClassName);
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
_sessions.setMaxChannelID(_delegate.getMaxChannelID());
//Update our session to use this new protocol version
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
}
catch (ClassNotFoundException e)
{
throw new AMQProtocolException
(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
String.format("Protocol: %s.%s is rquired by the broker but is not " +
"currently supported by this client library implementation",
pe.getMajorVersion(), pe.getMinorVersion()),
e);
}
catch (NoSuchMethodException e)
{
throw new RuntimeException("unable to locate constructor for delegate", e);
}
catch (InstantiationException e)
{
throw new RuntimeException("error instantiating delegate", e);
}
catch (IllegalAccessException e)
{
throw new RuntimeException("error accessing delegate", e);
}
catch (InvocationTargetException e)
{
throw new RuntimeException("error invoking delegate", e);
}
}
protected AMQConnection(String username, String password, String clientName, String virtualHost)
{
_clientName = clientName;
_username = username;
_password = password;
setVirtualHost(virtualHost);
}
private void setVirtualHost(String virtualHost)
{
if (virtualHost != null && virtualHost.startsWith("/"))
{
virtualHost = virtualHost.substring(1);
}
_virtualHost = virtualHost;
}
public boolean attemptReconnection(String host, int port)
{
BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration);
_failoverPolicy.setBroker(bd);
try
{
makeBrokerConnection(bd);
return true;
}
catch (Exception e)
{
if (_logger.isInfoEnabled())
{
_logger.info("Unable to connect to broker at " + bd);
}
attemptReconnection();
}
return false;
}
public boolean attemptReconnection()
{
BrokerDetails broker = null;
while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null)
{
try
{
makeBrokerConnection(broker);
return true;
}
catch (Exception e)
{
if (!(e instanceof AMQException))
{
if (_logger.isInfoEnabled())
{
_logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
}
}
else
{
if (_logger.isInfoEnabled())
{
_logger.info(e.getMessage() + ":Unable to connect to broker at "
+ _failoverPolicy.getCurrentBrokerDetails());
}
}
}
}
// connection unsuccessful
return false;
}
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
return _delegate.makeBrokerConnection(brokerDetail);
}
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
return _delegate.executeRetrySupport(operation);
}
/**
* Get the details of the currently active broker
*
* @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
* otherwise
*/
public BrokerDetails getActiveBrokerDetails()
{
return _failoverPolicy.getCurrentBrokerDetails();
}
public boolean failoverAllowed()
{
if (!_connected)
{
return false;
}
else
{
return _failoverPolicy.failoverAllowed();
}
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
return createSession(transacted, acknowledgeMode, _maxPrefetch);
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
throws JMSException
{
return createSession(transacted, acknowledgeMode, prefetch, prefetch);
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException
{
synchronized (_sessionCreationLock)
{
checkNotClosed();
return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
}
}
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException, FailoverException
{
ChannelOpenBody channelOpenBody = getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
// TODO: Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
// todo send low water mark when protocol allows.
// todo Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
if (transacted)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Issuing TxSelect for " + channelId);
}
TxSelectBody body = getProtocolHandler().getMethodRegistry().createTxSelectBody();
// TODO: Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
}
}
private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException, FailoverException
{
try
{
createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
}
catch (AMQException e)
{
deregisterSession(channelId);
throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
}
}
public void setFailoverPolicy(FailoverPolicy policy)
{
_failoverPolicy = policy;
}
public FailoverPolicy getFailoverPolicy()
{
return _failoverPolicy;
}
/**
* Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
* the JMS spec
*
* @param transacted
* @param acknowledgeMode
*
* @return QueueSession
*
* @throws JMSException
*/
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
{
return new AMQQueueSessionAdaptor(createSession(transacted, acknowledgeMode));
}
/**
* Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
* the JMS spec
*
* @param transacted
* @param acknowledgeMode
*
* @return TopicSession
*
* @throws JMSException
*/
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
{
return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode));
}
public boolean channelLimitReached()
{
return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount);
}
public String getClientID() throws JMSException
{
checkNotClosed();
return _clientName;
}
public void setClientID(String clientID) throws JMSException
{
checkNotClosed();
// in AMQP it is not possible to change the client ID. If one is not specified
// upon connection construction, an id is generated automatically. Therefore
// we can always throw an exception.
if (!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME))
{
throw new IllegalStateException("Client name cannot be changed after being set");
}
else
{
_logger.info("Operation setClientID is ignored using ID: " + getClientID());
}
}
public ConnectionMetaData getMetaData() throws JMSException
{
checkNotClosed();
return _connectionMetaData;
}
public ExceptionListener getExceptionListener() throws JMSException
{
checkNotClosed();
return _exceptionListener;
}
public void setExceptionListener(ExceptionListener listener) throws JMSException
{
checkNotClosed();
_exceptionListener = listener;
}
/**
* Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
* and is not thread safe (which is legal according to the JMS specification).
*
* @throws JMSException
*/
public void start() throws JMSException
{
checkNotClosed();
if (!_started)
{
_started = true;
final Iterator it = _sessions.values().iterator();
while (it.hasNext())
{
final AMQSession s = (AMQSession) (it.next());
try
{
s.start();
}
catch (AMQException e)
{
throw new JMSAMQException(e);
}
}
}
}
public void stop() throws JMSException
{
checkNotClosed();
if (_started)
{
for (Iterator i = _sessions.values().iterator(); i.hasNext();)
{
try
{
((AMQSession) i.next()).stop();
}
catch (AMQException e)
{
throw new JMSAMQException(e);
}
}
_started = false;
}
}
public void close() throws JMSException
{
close(DEFAULT_TIMEOUT);
}
public void close(long timeout) throws JMSException
{
close(new ArrayList<AMQSession>(_sessions.values()), timeout);
}
public void close(List<AMQSession> sessions, long timeout) throws JMSException
{
if (!_closed.getAndSet(true))
{
_closing.set(true);
try
{
synchronized (getFailoverMutex())
{
doClose(sessions, timeout);
}
}
finally
{
_closing.set(false);
}
}
}
private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
{
synchronized (_sessionCreationLock)
{
if (!sessions.isEmpty())
{
AMQSession session = sessions.remove(0);
synchronized (session.getMessageDeliveryLock())
{
doClose(sessions, timeout);
}
}
else
{
try
{
long startCloseTime = System.currentTimeMillis();
closeAllSessions(null, timeout, startCloseTime);
//This MUST occur after we have successfully closed all Channels/Sessions
_taskPool.shutdown();
if (!_taskPool.isTerminated())
{
try
{
// adjust timeout
long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
_taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
_logger.info("Interrupted while shutting down connection thread pool.");
}
}
// adjust timeout
timeout = adjustTimeout(timeout, startCloseTime);
_delegate.closeConnection(timeout);
//If the taskpool hasn't shutdown by now then give it shutdownNow.
// This will interupt any running tasks.
if (!_taskPool.isTerminated())
{
List<Runnable> tasks = _taskPool.shutdownNow();
for (Runnable r : tasks)
{
_logger.warn("Connection close forced taskpool to prevent execution:" + r);
}
}
}
catch (AMQException e)
{
_logger.error("error:", e);
JMSException jmse = new JMSException("Error closing connection: " + e);
jmse.setLinkedException(e);
jmse.initCause(e);
throw jmse;
}
}
}
}
private long adjustTimeout(long timeout, long startTime)
{
long now = System.currentTimeMillis();
timeout -= now - startTime;
if (timeout < 0)
{
timeout = 0;
}
return timeout;
}
/**
* Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
* mark objects "visible" in userland as closed after failover or other significant event that impacts the
* connection. <p/> The caller must hold the failover mutex before calling this method.
*/
private void markAllSessionsClosed()
{
final LinkedList sessionCopy = new LinkedList(_sessions.values());
final Iterator it = sessionCopy.iterator();
while (it.hasNext())
{
final AMQSession session = (AMQSession) it.next();
session.markClosed();
}
_sessions.clear();
}
/**
* Close all the sessions, either due to normal connection closure or due to an error occurring.
*
* @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
* before calling this method.
*/
private void closeAllSessions(Throwable cause, long timeout, long starttime) throws JMSException
{
final LinkedList sessionCopy = new LinkedList(_sessions.values());
final Iterator it = sessionCopy.iterator();
JMSException sessionException = null;
while (it.hasNext())
{
final AMQSession session = (AMQSession) it.next();
if (cause != null)
{
session.closed(cause);
}
else
{
try
{
if (starttime != -1)
{
timeout = adjustTimeout(timeout, starttime);
}
session.close(timeout);
}
catch (JMSException e)
{
_logger.error("Error closing session: " + e);
sessionException = e;
}
}
}
_sessions.clear();
if (sessionException != null)
{
throw sessionException;
}
}
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
checkNotClosed();
return null;
}
public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException
{
checkNotClosed();
return null;
}
public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException
{
checkNotClosed();
return null;
}
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
// TODO Auto-generated method stub
checkNotClosed();
return null;
}
public long getMaximumChannelCount() throws JMSException
{
checkNotClosed();
return _maximumChannelCount;
}
public void setConnectionListener(ConnectionListener listener)
{
_connectionListener = listener;
}
public ConnectionListener getConnectionListener()
{
return _connectionListener;
}
public void setMaximumChannelCount(long maximumChannelCount)
{
_maximumChannelCount = maximumChannelCount;
}
public void setMaximumFrameSize(long frameMax)
{
_maximumFrameSize = frameMax;
}
public long getMaximumFrameSize()
{
return _maximumFrameSize;
}
public ChannelToSessionMap getSessions()
{
return _sessions;
}
public String getUsername()
{
return _username;
}
public void setUsername(String id)
{
_username = id;
}
public String getPassword()
{
return _password;
}
public String getVirtualHost()
{
return _virtualHost;
}
public AMQProtocolHandler getProtocolHandler()
{
return _protocolHandler;
}
public boolean started()
{
return _started;
}
public void bytesSent(long writtenBytes)
{
if (_connectionListener != null)
{
_connectionListener.bytesSent(writtenBytes);
}
}
public void bytesReceived(long receivedBytes)
{
if (_connectionListener != null)
{
_connectionListener.bytesReceived(receivedBytes);
}
}
/**
* Fire the preFailover event to the registered connection listener (if any)
*
* @param redirect true if this is the result of a redirect request rather than a connection error
*
* @return true if no listener or listener does not veto change
*/
public boolean firePreFailover(boolean redirect)
{
boolean proceed = true;
if (_connectionListener != null)
{
proceed = _connectionListener.preFailover(redirect);
}
return proceed;
}
/**
* Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
* resubscription then all the sessions are closed.
*
* @return true if no listener or listener does not veto resubscription.
*
* @throws JMSException
*/
public boolean firePreResubscribe() throws JMSException
{
if (_connectionListener != null)
{
boolean resubscribe = _connectionListener.preResubscribe();
if (!resubscribe)
{
markAllSessionsClosed();
}
return resubscribe;
}
else
{
return true;
}
}
/** Fires a failover complete event to the registered connection listener (if any). */
public void fireFailoverComplete()
{
if (_connectionListener != null)
{
_connectionListener.failoverComplete();
}
}
/**
* In order to protect the consistency of the connection and its child sessions, consumers and producers, the
* "failover mutex" must be held when doing any operations that could be corrupted during failover.
*
* @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
*/
public final Object getFailoverMutex()
{
return _failoverMutex;
}
public void failoverPrep()
{
_delegate.failoverPrep();
}
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
_delegate.resubscribeSessions();
}
/**
* If failover is taking place this will block until it has completed. If failover is not taking place it will
* return immediately.
*
* @throws InterruptedException
*/
public void blockUntilNotFailingOver() throws InterruptedException
{
_protocolHandler.blockUntilNotFailingOver();
}
/**
* Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
* to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
* propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
*
* @param cause the exception
*/
public void exceptionReceived(Throwable cause)
{
if (_logger.isDebugEnabled())
{
_logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
}
final JMSException je;
if (cause instanceof JMSException)
{
je = (JMSException) cause;
}
else
{
AMQConstant code = null;
if (cause instanceof AMQException)
{
code = ((AMQException) cause).getErrorCode();
}
if (code != null)
{
je = new JMSException(Integer.toString(code.getCode()), "Exception thrown against " + toString() + ": " + cause);
}
else
{
//Should never get here as all AMQEs are required to have an ErrorCode!
// Other than AMQDisconnectedEx!
if (cause instanceof AMQDisconnectedException)
{
Exception last = _protocolHandler.getStateManager().getLastException();
if (last != null)
{
_logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception");
cause = last;
}
}
je = new JMSException("Exception thrown against " + toString() + ": " + cause);
}
if (cause instanceof Exception)
{
je.setLinkedException((Exception) cause);
}
je.initCause(cause);
}
boolean closer = false;
// in the case of an IOException, MINA has closed the protocol session so we set _closed to true
// so that any generic client code that tries to close the connection will not mess up this error
// handling sequence
if (cause instanceof IOException || cause instanceof AMQDisconnectedException)
{
// If we have an IOE/AMQDisconnect there is no connection to close on.
_closing.set(false);
closer = !_closed.getAndSet(true);
_protocolHandler.getProtocolSession().notifyError(je);
}
// get the failover mutex before trying to close
synchronized (getFailoverMutex())
{
// decide if we are going to close the session
if (hardError(cause))
{
closer = (!_closed.getAndSet(true)) || closer;
{
_logger.info("Closing AMQConnection due to :" + cause);
}
}
else
{
_logger.info("Not a hard-error connection not closing: " + cause);
}
// if we are closing the connection, close sessions first
if (closer)
{
try
{
closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
}
catch (JMSException e)
{
_logger.error("Error closing all sessions: " + e, e);
}
}
// deliver the exception if there is a listener
if (_exceptionListener != null)
{
_exceptionListener.onException(je);
}
else
{
_logger.error("Throwable Received but no listener set: " + cause);
}
}
}
private boolean hardError(Throwable cause)
{
if (cause instanceof AMQException)
{
return ((AMQException) cause).isHardError();
}
return true;
}
void registerSession(int channelId, AMQSession session)
{
_sessions.put(channelId, session);
}
public void deregisterSession(int channelId)
{
_sessions.remove(channelId);
}
public String toString()
{
StringBuffer buf = new StringBuffer("AMQConnection:\n");
if (_failoverPolicy.getCurrentBrokerDetails() == null)
{
buf.append("No active broker connection");
}
else
{
BrokerDetails bd = _failoverPolicy.getCurrentBrokerDetails();
buf.append("Host: ").append(String.valueOf(bd.getHost()));
buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
}
buf.append("\nVirtual Host: ").append(String.valueOf(_virtualHost));
buf.append("\nClient ID: ").append(String.valueOf(_clientName));
buf.append("\nActive session count: ").append((_sessions == null) ? 0 : _sessions.size());
return buf.toString();
}
public String toURL()
{
return _connectionURL.toString();
}
public Reference getReference() throws NamingException
{
return new Reference(AMQConnection.class.getName(), new StringRefAddr(AMQConnection.class.getName(), toURL()),
AMQConnectionFactory.class.getName(), null); // factory location
}
public SSLConfiguration getSSLConfiguration()
{
return _sslConfiguration;
}
public AMQShortString getDefaultTopicExchangeName()
{
return _defaultTopicExchangeName;
}
public void setDefaultTopicExchangeName(AMQShortString defaultTopicExchangeName)
{
_defaultTopicExchangeName = defaultTopicExchangeName;
}
public AMQShortString getDefaultQueueExchangeName()
{
return _defaultQueueExchangeName;
}
public void setDefaultQueueExchangeName(AMQShortString defaultQueueExchangeName)
{
_defaultQueueExchangeName = defaultQueueExchangeName;
}
public AMQShortString getTemporaryTopicExchangeName()
{
return _temporaryTopicExchangeName;
}
public AMQShortString getTemporaryQueueExchangeName()
{
return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates.
}
public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName)
{
_temporaryTopicExchangeName = temporaryTopicExchangeName;
}
public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName)
{
_temporaryQueueExchangeName = temporaryQueueExchangeName;
}
public void performConnectionTask(Runnable task)
{
_taskPool.execute(task);
}
public AMQSession getSession(int channelId)
{
return _sessions.get(channelId);
}
public ProtocolVersion getProtocolVersion()
{
return _delegate.getProtocolVersion();
}
public boolean isFailingOver()
{
return (_protocolHandler.getFailoverLatch() != null);
}
/**
* Get the maximum number of messages that this connection can pre-fetch.
*
* @return The maximum number of messages that this connection can pre-fetch.
*/
public long getMaxPrefetch()
{
return _maxPrefetch;
}
/**
* Indicates whether persistent messages are synchronized
*
* @return true if persistent messages are synchronized false otherwise
*/
public boolean getSyncPersistence()
{
return _syncPersistence;
}
/**
* Indicates whether we need to sync on every message ack
*/
public boolean getSyncAck()
{
return _syncAck;
}
public String getSyncPublish()
{
return _syncPublish;
}
public int getNextChannelID()
{
return _sessions.getNextChannelId();
}
public boolean isUseLegacyMapMessageFormat()
{
return _useLegacyMapMessageFormat;
}
}