blob: 3ffb53355d19c4f57e6b42665baf966e0f506866 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.ra;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.XAQueueSession;
import javax.jms.XASession;
import javax.jms.XATopicSession;
import javax.naming.Reference;
import javax.resource.Referenceable;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements the JMS Connection API and produces {@link QpidRASessionImpl} objects.
*
*/
public class QpidRASessionFactoryImpl implements QpidRASessionFactory, Referenceable
{
/** The logger */
private static final Logger _log = LoggerFactory.getLogger(QpidRASessionFactoryImpl.class);
/** Are we closed? */
private boolean _closed = false;
/** The naming reference */
private Reference _reference;
/** The user name */
private String _userName;
/** The password */
private String _password;
/** The client ID */
private String _clientID;
/** The connection type */
private final int _type;
/** Whether we are started */
private boolean _started = false;
/** The managed connection factory */
private final QpidRAManagedConnectionFactory _mcf;
/** The connection manager */
private ConnectionManager _cm;
/** The sessions */
private final Set<QpidRASession> _sessions = new HashSet<QpidRASession>();
/** The temporary queues */
private final Set<TemporaryQueue> _tempQueues = new HashSet<TemporaryQueue>();
/** The temporary topics */
private final Set<TemporaryTopic> _tempTopics = new HashSet<TemporaryTopic>();
/**
* Constructor
* @param mcf The managed connection factory
* @param cm The connection manager
* @param type The connection type
*/
public QpidRASessionFactoryImpl(final QpidRAManagedConnectionFactory mcf,
final ConnectionManager cm,
final int type)
{
this._mcf = mcf;
if (cm == null)
{
this._cm = new QpidRAConnectionManager();
}
else
{
this._cm = cm;
}
this._type = type;
if (_log.isTraceEnabled())
{
_log.trace("constructor(" + mcf + ", " + cm + ", " + type);
}
}
/**
* Set the naming reference
* @param reference The reference
*/
public void setReference(final Reference reference)
{
if (_log.isTraceEnabled())
{
_log.trace("setReference(" + reference + ")");
}
this._reference = reference;
}
/**
* Get the naming reference
* @return The reference
*/
public Reference getReference()
{
if (_log.isTraceEnabled())
{
_log.trace("getReference()");
}
return _reference;
}
/**
* Set the user name
* @param name The user name
*/
public void setUserName(final String name)
{
if (_log.isTraceEnabled())
{
_log.trace("setUserName(" + name + ")");
}
_userName = name;
}
/**
* Set the password
* @param password The password
*/
public void setPassword(final String password)
{
if (_log.isTraceEnabled())
{
_log.trace("setPassword(****)");
}
this._password = password;
}
/**
* Get the client ID
* @return The client ID
* @exception JMSException Thrown if an error occurs
*/
public String getClientID() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("getClientID()");
}
checkClosed();
if (_clientID == null)
{
try
{
return _mcf.getDefaultAMQConnectionFactory().getConnectionURL().getClientName() ;
}
catch (final ResourceException re)
{
final JMSException jmse = new JMSException("Unexpected exception obtaining resource") ;
jmse.initCause(re) ;
throw jmse ;
}
}
return _clientID;
}
/**
* Set the client ID -- throws IllegalStateException
* @param cID The client ID
* @exception JMSException Thrown if an error occurs
*/
public void setClientID(final String cID) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("setClientID(" + cID + ")");
}
throw new IllegalStateException(QpidRASessionFactory.ISE);
}
/**
* Create a queue session
* @param transacted Use transactions
* @param acknowledgeMode The acknowledge mode
* @return The queue session
* @exception JMSException Thrown if an error occurs
*/
public QueueSession createQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("createQueueSession(" + transacted + ", " + acknowledgeMode + ")");
}
checkClosed();
if (_type == QpidRAConnectionFactory.TOPIC_CONNECTION || _type == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
{
throw new IllegalStateException("Can not get a queue session from a topic connection");
}
return (QueueSession)allocateConnection(transacted, acknowledgeMode, _type);
}
/**
* Create a XA queue session
* @return The XA queue session
* @exception JMSException Thrown if an error occurs
*/
public XAQueueSession createXAQueueSession() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("createXAQueueSession()");
}
checkClosed();
if (_type == QpidRAConnectionFactory.CONNECTION || _type == QpidRAConnectionFactory.TOPIC_CONNECTION ||
_type == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
{
throw new IllegalStateException("Can not get a topic session from a queue connection");
}
return (XAQueueSession) allocateConnection(_type);
}
/**
* Create a connection consumer -- throws IllegalStateException
* @param queue The queue
* @param messageSelector The message selector
* @param sessionPool The session pool
* @param maxMessages The number of max messages
* @return The connection consumer
* @exception JMSException Thrown if an error occurs
*/
public ConnectionConsumer createConnectionConsumer(final Queue queue,
final String messageSelector,
final ServerSessionPool sessionPool,
final int maxMessages) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("createConnectionConsumer(" + queue +
", " +
messageSelector +
", " +
sessionPool +
", " +
maxMessages +
")");
}
throw new IllegalStateException(QpidRASessionFactory.ISE);
}
/**
* Create a topic session
* @param transacted Use transactions
* @param acknowledgeMode The acknowledge mode
* @return The topic session
* @exception JMSException Thrown if an error occurs
*/
public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("createTopicSession(" + transacted + ", " + acknowledgeMode + ")");
}
checkClosed();
if (_type == QpidRAConnectionFactory.QUEUE_CONNECTION || _type == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
{
throw new IllegalStateException("Can not get a topic session from a queue connection");
}
return (TopicSession) allocateConnection(transacted, acknowledgeMode, _type);
}
/**
* Create a XA topic session
* @return The XA topic session
* @exception JMSException Thrown if an error occurs
*/
public XATopicSession createXATopicSession() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("createXATopicSession()");
}
checkClosed();
if (_type == QpidRAConnectionFactory.CONNECTION || _type == QpidRAConnectionFactory.QUEUE_CONNECTION ||
_type == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
{
throw new IllegalStateException("Can not get a topic session from a queue connection");
}
return (XATopicSession) allocateConnection(_type);
}
/**
* Create a connection consumer -- throws IllegalStateException
* @param topic The topic
* @param messageSelector The message selector
* @param sessionPool The session pool
* @param maxMessages The number of max messages
* @return The connection consumer
* @exception JMSException Thrown if an error occurs
*/
public ConnectionConsumer createConnectionConsumer(final Topic topic,
final String messageSelector,
final ServerSessionPool sessionPool,
final int maxMessages) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("createConnectionConsumer(" + topic +
", " +
messageSelector +
", " +
sessionPool +
", " +
maxMessages +
")");
}
throw new IllegalStateException(QpidRASessionFactory.ISE);
}
/**
* Create a durable connection consumer -- throws IllegalStateException
* @param topic The topic
* @param subscriptionName The subscription name
* @param messageSelector The message selector
* @param sessionPool The session pool
* @param maxMessages The number of max messages
* @return The connection consumer
* @exception JMSException Thrown if an error occurs
*/
public ConnectionConsumer createDurableConnectionConsumer(final Topic topic,
final String subscriptionName,
final String messageSelector,
final ServerSessionPool sessionPool,
final int maxMessages) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("createConnectionConsumer(" + topic +
", " +
subscriptionName +
", " +
messageSelector +
", " +
sessionPool +
", " +
maxMessages +
")");
}
throw new IllegalStateException(QpidRASessionFactory.ISE);
}
/**
* Create a connection consumer -- throws IllegalStateException
* @param destination The destination
* @param pool The session pool
* @param maxMessages The number of max messages
* @return The connection consumer
* @exception JMSException Thrown if an error occurs
*/
public ConnectionConsumer createConnectionConsumer(final Destination destination,
final ServerSessionPool pool,
final int maxMessages) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("createConnectionConsumer(" + destination +
", " +
pool +
", " +
maxMessages +
")");
}
throw new IllegalStateException(QpidRASessionFactory.ISE);
}
/**
* Create a connection consumer -- throws IllegalStateException
* @param destination The destination
* @param name The name
* @param pool The session pool
* @param maxMessages The number of max messages
* @return The connection consumer
* @exception JMSException Thrown if an error occurs
*/
public ConnectionConsumer createConnectionConsumer(final Destination destination,
final String name,
final ServerSessionPool pool,
final int maxMessages) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("createConnectionConsumer(" + destination +
", " +
name +
", " +
pool +
", " +
maxMessages +
")");
}
throw new IllegalStateException(QpidRASessionFactory.ISE);
}
/**
* Create a session
* @param transacted Use transactions
* @param acknowledgeMode The acknowledge mode
* @return The session
* @exception JMSException Thrown if an error occurs
*/
public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("createSession(" + transacted + ", " + acknowledgeMode + ")");
}
checkClosed();
return (Session) allocateConnection(transacted, acknowledgeMode, _type);
}
/**
* Create a XA session
* @return The XA session
* @exception JMSException Thrown if an error occurs
*/
public XASession createXASession() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("createXASession()");
}
checkClosed();
return (XASession) allocateConnection(_type);
}
/**
* Get the connection metadata
* @return The connection metadata
* @exception JMSException Thrown if an error occurs
*/
public ConnectionMetaData getMetaData() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("getMetaData()");
}
checkClosed();
return _mcf.getMetaData();
}
/**
* Get the exception listener -- throws IllegalStateException
* @return The exception listener
* @exception JMSException Thrown if an error occurs
*/
public ExceptionListener getExceptionListener() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("getExceptionListener()");
}
throw new IllegalStateException(QpidRASessionFactory.ISE);
}
/**
* Set the exception listener -- throws IllegalStateException
* @param listener The exception listener
* @exception JMSException Thrown if an error occurs
*/
public void setExceptionListener(final ExceptionListener listener) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("setExceptionListener(" + listener + ")");
}
throw new IllegalStateException(QpidRASessionFactory.ISE);
}
/**
* Start
* @exception JMSException Thrown if an error occurs
*/
public void start() throws JMSException
{
checkClosed();
if (_log.isTraceEnabled())
{
_log.trace("start() " + this);
}
synchronized (_sessions)
{
if (_started)
{
return;
}
_started = true;
for (Iterator<QpidRASession> i = _sessions.iterator(); i.hasNext();)
{
QpidRASession session = (QpidRASession)i.next();
session.start();
}
}
}
/**
* Stop -- throws IllegalStateException
* @exception JMSException Thrown if an error occurs
*/
public void stop() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("stop() " + this);
}
throw new IllegalStateException(QpidRASessionFactory.ISE);
}
/**
* Close
* @exception JMSException Thrown if an error occurs
*/
public void close() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("close() " + this);
}
if (_closed)
{
return;
}
_closed = true;
synchronized (_sessions)
{
for (Iterator<QpidRASession> i = _sessions.iterator(); i.hasNext();)
{
QpidRASession session = (QpidRASession)i.next();
try
{
session.closeSession();
}
catch (Throwable t)
{
_log.trace("Error closing session", t);
}
i.remove();
}
}
synchronized (_tempQueues)
{
for (Iterator<TemporaryQueue> i = _tempQueues.iterator(); i.hasNext();)
{
TemporaryQueue temp = (TemporaryQueue)i.next();
try
{
if (_log.isTraceEnabled())
{
_log.trace("Closing temporary queue " + temp + " for " + this);
}
temp.delete();
}
catch (Throwable t)
{
_log.trace("Error deleting temporary queue", t);
}
i.remove();
}
}
synchronized (_tempTopics)
{
for (Iterator<TemporaryTopic> i = _tempTopics.iterator(); i.hasNext();)
{
TemporaryTopic temp = (TemporaryTopic)i.next();
try
{
if (_log.isTraceEnabled())
{
_log.trace("Closing temporary topic " + temp + " for " + this);
}
temp.delete();
}
catch (Throwable t)
{
_log.trace("Error deleting temporary queue", t);
}
i.remove();
}
}
}
/**
* Close session
* @param session The session
* @exception JMSException Thrown if an error occurs
*/
public void closeSession(final QpidRASession session) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("closeSession(" + session + ")");
}
synchronized (_sessions)
{
_sessions.clear();
}
}
/**
* Add temporary queue
* @param temp The temporary queue
*/
public void addTemporaryQueue(final TemporaryQueue temp)
{
if (_log.isTraceEnabled())
{
_log.trace("addTemporaryQueue(" + temp + ")");
}
synchronized (_tempQueues)
{
_tempQueues.add(temp);
}
}
/**
* Add temporary topic
* @param temp The temporary topic
*/
public void addTemporaryTopic(final TemporaryTopic temp)
{
if (_log.isTraceEnabled())
{
_log.trace("addTemporaryTopic(" + temp + ")");
}
synchronized (_tempTopics)
{
_tempTopics.add(temp);
}
}
/**
* Allocation a connection
* @param sessionType The session type
* @return The session
* @exception JMSException Thrown if an error occurs
*/
protected QpidRASession allocateConnection(final int sessionType) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("allocateConnection(" + sessionType + ")");
}
try
{
synchronized (_sessions)
{
if (_sessions.isEmpty() == false)
{
throw new IllegalStateException("Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
}
QpidRAConnectionRequestInfo info = new QpidRAConnectionRequestInfo(sessionType);
info.setUserName(_userName);
info.setPassword(_password);
info.setClientId(_clientID);
info.setDefaults(_mcf.getDefaultAMQConnectionFactory().getConnectionURL());
if (_log.isTraceEnabled())
{
_log.trace("Allocating session for " + this + " with request info=" + info);
}
QpidRASession session = (QpidRASession)_cm.allocateConnection(_mcf, info);
try
{
if (_log.isTraceEnabled())
{
_log.trace("Allocated " + this + " session=" + session);
}
session.setQpidSessionFactory(this);
if (_started)
{
session.start();
}
_sessions.add(session);
return session;
}
catch (Throwable t)
{
try
{
session.close();
}
catch (Throwable ignored)
{
}
if (t instanceof Exception)
{
throw (Exception)t;
}
else
{
throw new RuntimeException("Unexpected error: ", t);
}
}
}
}
catch (Exception e)
{
_log.error("Could not create session", e);
JMSException je = new JMSException("Could not create a session: " + e.getMessage());
je.setLinkedException(e);
je.initCause(e);
throw je;
}
}
/**
* Allocation a connection
* @param transacted Use transactions
* @param acknowledgeMode The acknowledge mode
* @param sessionType The session type
* @return The session
* @exception JMSException Thrown if an error occurs
*/
protected QpidRASession allocateConnection(final boolean transacted, int acknowledgeMode, final int sessionType) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("allocateConnection(" + transacted +
", " +
acknowledgeMode +
", " +
sessionType +
")");
}
try
{
synchronized (_sessions)
{
if (_sessions.isEmpty() == false)
{
throw new IllegalStateException("Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
}
if (transacted)
{
acknowledgeMode = Session.SESSION_TRANSACTED;
}
QpidRAConnectionRequestInfo info = new QpidRAConnectionRequestInfo(transacted,
acknowledgeMode,
sessionType);
info.setUserName(_userName);
info.setPassword(_password);
info.setClientId(_clientID);
info.setDefaults(_mcf.getDefaultAMQConnectionFactory().getConnectionURL());
if (_log.isTraceEnabled())
{
_log.trace("Allocating session for " + this + " with request info=" + info);
}
QpidRASession session = (QpidRASession)_cm.allocateConnection(_mcf, info);
try
{
if (_log.isTraceEnabled())
{
_log.trace("Allocated " + this + " session=" + session);
}
session.setQpidSessionFactory(this);
if (_started)
{
session.start();
}
_sessions.add(session);
return session;
}
catch (Throwable t)
{
try
{
session.close();
}
catch (Throwable ignored)
{
}
if (t instanceof Exception)
{
throw (Exception)t;
}
else
{
throw new RuntimeException("Unexpected error: ", t);
}
}
}
}
catch (Exception e)
{
_log.error("Could not create session", e);
JMSException je = new JMSException("Could not create a session: " + e.getMessage());
je.setLinkedException(e);
je.initCause(e);
throw je;
}
}
/**
* Check if we are closed
* @exception IllegalStateException Thrown if closed
*/
protected void checkClosed() throws IllegalStateException
{
if (_closed)
{
throw new IllegalStateException("The connection is closed");
}
}
}