blob: a972d6cda0c6733031c3017871ea6f0e0bb64e3e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.ra;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.TransactionInProgressException;
import javax.jms.XAQueueSession;
import javax.jms.XASession;
import javax.jms.XATopicSession;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ManagedConnection;
import javax.transaction.xa.XAResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A joint interface for JMS sessions
*
*/
public class QpidRASessionImpl implements Session, QueueSession, TopicSession, XASession, XAQueueSession, XATopicSession, QpidRASession
{
/** The logger */
private static final Logger _log = LoggerFactory.getLogger(QpidRASessionImpl.class);
/** The managed connection */
private volatile QpidRAManagedConnection _mc;
/** The locked managed connection */
private QpidRAManagedConnection _lockedMC;
/** The connection request info */
private final QpidRAConnectionRequestInfo _cri;
/** The session factory */
private QpidRASessionFactory _sf;
/** The message consumers */
private final Set<MessageConsumer> _consumers;
/** The message producers */
private final Set<MessageProducer> _producers;
/** The queue browsers */
private final Set<QueueBrowser> _browsers;
/** Are we started */
private AtomicBoolean _started = new AtomicBoolean(false) ;
/**
* Constructor
* @param mc The managed connection
* @param cri The connection request info
*/
public QpidRASessionImpl(final QpidRAManagedConnection mc, final QpidRAConnectionRequestInfo cri)
{
if (_log.isTraceEnabled())
{
_log.trace("constructor(" + mc + ", " + cri + ")");
}
this._mc = mc;
this._cri = cri;
_sf = null;
_consumers = new HashSet<MessageConsumer>();
_producers = new HashSet<MessageProducer>();
_browsers = new HashSet<QueueBrowser>();
}
/**
* Set the session factory
* @param sf The session factory
*/
public void setQpidSessionFactory(final QpidRASessionFactory sf)
{
if (_log.isTraceEnabled())
{
_log.trace("setQpidSessionFactory(" + sf + ")");
}
_started.set(false) ;
this._sf = sf;
}
/**
* Lock
* @exception JMSException Thrown if an error occurs
* @exception IllegalStateException The session is closed
*/
protected void lock() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("lock()");
}
final QpidRAManagedConnection mc = this._mc;
if (mc != null)
{
mc.tryLock();
_lockedMC = mc ;
}
else
{
throw new IllegalStateException("Connection is not associated with a managed connection. " + this);
}
}
/**
* Unlock
*/
protected void unlock()
{
if (_log.isTraceEnabled())
{
_log.trace("unlock()");
}
if (_lockedMC != null)
{
try
{
_lockedMC.unlock();
}
finally
{
_lockedMC = null ;
}
}
// We recreate the lock when returned to the pool
// so missing the unlock after disassociation is not important
}
/**
* Create a bytes message
* @return The message
* @exception JMSException Thrown if an error occurs
*/
public BytesMessage createBytesMessage() throws JMSException
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createBytesMessage" + Util.asString(session));
}
return session.createBytesMessage();
}
/**
* Create a map message
* @return The message
* @exception JMSException Thrown if an error occurs
*/
public MapMessage createMapMessage() throws JMSException
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createMapMessage" + Util.asString(session));
}
return session.createMapMessage();
}
/**
* Create a message
* @return The message
* @exception JMSException Thrown if an error occurs
*/
public Message createMessage() throws JMSException
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createMessage" + Util.asString(session));
}
return session.createMessage();
}
/**
* Create an object message
* @return The message
* @exception JMSException Thrown if an error occurs
*/
public ObjectMessage createObjectMessage() throws JMSException
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createObjectMessage" + Util.asString(session));
}
return session.createObjectMessage();
}
/**
* Create an object message
* @param object The object
* @return The message
* @exception JMSException Thrown if an error occurs
*/
public ObjectMessage createObjectMessage(final Serializable object) throws JMSException
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createObjectMessage(" + object + ")" + Util.asString(session));
}
return session.createObjectMessage(object);
}
/**
* Create a stream message
* @return The message
* @exception JMSException Thrown if an error occurs
*/
public StreamMessage createStreamMessage() throws JMSException
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createStreamMessage" + Util.asString(session));
}
return session.createStreamMessage();
}
/**
* Create a text message
* @return The message
* @exception JMSException Thrown if an error occurs
*/
public TextMessage createTextMessage() throws JMSException
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createTextMessage" + Util.asString(session));
}
return session.createTextMessage();
}
/**
* Create a text message
* @param string The text
* @return The message
* @exception JMSException Thrown if an error occurs
*/
public TextMessage createTextMessage(final String string) throws JMSException
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createTextMessage(" + string + ")" + Util.asString(session));
}
return session.createTextMessage(string);
}
/**
* Get transacted
* @return True if transacted; otherwise false
* @exception JMSException Thrown if an error occurs
*/
public boolean getTransacted() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("getTransacted()");
}
getSessionInternal();
return _cri.isTransacted();
}
/**
* Get the message listener -- throws IllegalStateException
* @return The message listener
* @exception JMSException Thrown if an error occurs
*/
public MessageListener getMessageListener() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("getMessageListener()");
}
throw new IllegalStateException("Method not allowed");
}
/**
* Set the message listener -- Throws IllegalStateException
* @param listener The message listener
* @exception JMSException Thrown if an error occurs
*/
public void setMessageListener(final MessageListener listener) throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("setMessageListener(" + listener + ")");
}
throw new IllegalStateException("Method not allowed");
}
/**
* Always throws an Error.
* @exception Error Method not allowed.
*/
public void run()
{
if (_log.isTraceEnabled())
{
_log.trace("run()");
}
throw new Error("Method not allowed");
}
/**
* Closes the session. Sends a ConnectionEvent.CONNECTION_CLOSED to the
* managed connection.
* @exception JMSException Failed to close session.
*/
public void close() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("close()");
}
_sf.closeSession(this);
closeSession();
}
/**
* Commit
* @exception JMSException Failed to close session.
*/
public void commit() throws JMSException
{
if (_cri.getType() == QpidRAConnectionFactory.XA_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION ||
_cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
{
throw new TransactionInProgressException("XA connection");
}
lock();
try
{
if (_cri.isTransacted() == false)
{
throw new IllegalStateException("Session is not transacted");
}
if(_lockedMC.isConnectionClosed())
{
throw new IllegalStateException("Attempting to call commit when the underlying connection has been closed.");
}
if (_log.isTraceEnabled())
{
_log.trace("Commit session " + this);
}
getSessionInternal().commit();
}
finally
{
unlock();
}
}
/**
* Rollback
* @exception JMSException Failed to close session.
*/
public void rollback() throws JMSException
{
if (_cri.getType() == QpidRAConnectionFactory.XA_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION ||
_cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
{
throw new TransactionInProgressException("XA connection");
}
lock();
try
{
Session session = getSessionInternal();
if (_cri.isTransacted() == false)
{
throw new IllegalStateException("Session is not transacted");
}
if (_log.isTraceEnabled())
{
_log.trace("Rollback session " + this);
}
session.rollback();
}
finally
{
unlock();
}
}
/**
* Recover
* @exception JMSException Failed to close session.
*/
public void recover() throws JMSException
{
lock();
try
{
Session session = getSessionInternal();
if (_cri.isTransacted())
{
throw new IllegalStateException("Session is transacted");
}
if (_log.isTraceEnabled())
{
_log.trace("Recover session " + this);
}
session.recover();
}
finally
{
unlock();
}
}
/**
* Create a topic
* @param topicName The topic name
* @return The topic
* @exception JMSException Thrown if an error occurs
*/
public Topic createTopic(final String topicName) throws JMSException
{
if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
{
throw new IllegalStateException("Cannot create topic for javax.jms.QueueSession");
}
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createTopic " + Util.asString(session) + " topicName=" + topicName);
}
Topic result = session.createTopic(topicName);
if (_log.isTraceEnabled())
{
_log.trace("createdTopic " + Util.asString(session) + " topic=" + result);
}
return result;
}
/**
* Create a topic subscriber
* @param topic The topic
* @return The subscriber
* @exception JMSException Thrown if an error occurs
*/
public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
{
lock();
try
{
TopicSession session = getTopicSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createSubscriber " + Util.asString(session) + " topic=" + topic);
}
TopicSubscriber result = session.createSubscriber(topic);
result = new QpidRATopicSubscriber(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
}
addConsumer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Create a topic subscriber
* @param topic The topic
* @param messageSelector The message selector
* @param noLocal If true inhibits the delivery of messages published by its own connection
* @return The subscriber
* @exception JMSException Thrown if an error occurs
*/
public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException
{
lock();
try
{
TopicSession session = getTopicSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createSubscriber " + Util.asString(session) +
" topic=" +
topic +
" selector=" +
messageSelector +
" noLocal=" +
noLocal);
}
TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
result = new QpidRATopicSubscriber(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
}
addConsumer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Create a durable topic subscriber
* @param topic The topic
* @param name The name
* @return The subscriber
* @exception JMSException Thrown if an error occurs
*/
public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException
{
if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
{
throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");
}
lock();
try
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createDurableSubscriber " + Util.asString(session) + " topic=" + topic + " name=" + name);
}
TopicSubscriber result = session.createDurableSubscriber(topic, name);
result = new QpidRATopicSubscriber(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdDurableSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
}
addConsumer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Create a topic subscriber
* @param topic The topic
* @param name The name
* @param messageSelector The message selector
* @param noLocal If true inhibits the delivery of messages published by its own connection
* @return The subscriber
* @exception JMSException Thrown if an error occurs
*/
public TopicSubscriber createDurableSubscriber(final Topic topic,
final String name,
final String messageSelector,
final boolean noLocal) throws JMSException
{
if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
{
throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession");
}
lock();
try
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createDurableSubscriber " + Util.asString(session) +
" topic=" +
topic +
" name=" +
name +
" selector=" +
messageSelector +
" noLocal=" +
noLocal);
}
TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
result = new QpidRATopicSubscriber(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdDurableSubscriber " + Util.asString(session) + " QpidRATopicSubscriber=" + result);
}
addConsumer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Create a topic publisher
* @param topic The topic
* @return The publisher
* @exception JMSException Thrown if an error occurs
*/
public TopicPublisher createPublisher(final Topic topic) throws JMSException
{
lock();
try
{
TopicSession session = getTopicSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createPublisher " + Util.asString(session) + " topic=" + topic);
}
TopicPublisher result = session.createPublisher(topic);
result = new QpidRATopicPublisher(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdPublisher " + Util.asString(session) + " publisher=" + result);
}
addProducer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Create a temporary topic
* @return The temporary topic
* @exception JMSException Thrown if an error occurs
*/
public TemporaryTopic createTemporaryTopic() throws JMSException
{
if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
{
throw new IllegalStateException("Cannot create temporary topic for javax.jms.QueueSession");
}
lock();
try
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createTemporaryTopic " + Util.asString(session));
}
TemporaryTopic temp = session.createTemporaryTopic();
if (_log.isTraceEnabled())
{
_log.trace("createdTemporaryTopic " + Util.asString(session) + " temp=" + temp);
}
_sf.addTemporaryTopic(temp);
return temp;
}
finally
{
unlock();
}
}
/**
* Unsubscribe
* @param name The name
* @exception JMSException Thrown if an error occurs
*/
public void unsubscribe(final String name) throws JMSException
{
if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_QUEUE_CONNECTION)
{
throw new IllegalStateException("Cannot unsubscribe for javax.jms.QueueSession");
}
lock();
try
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("unsubscribe " + Util.asString(session) + " name=" + name);
}
session.unsubscribe(name);
}
finally
{
unlock();
}
}
/**
* Create a browser
* @param queue The queue
* @return The browser
* @exception JMSException Thrown if an error occurs
*/
public QueueBrowser createBrowser(final Queue queue) throws JMSException
{
if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
{
throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession");
}
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createBrowser " + Util.asString(session) + " queue=" + queue);
}
QueueBrowser result = session.createBrowser(queue);
if (_log.isTraceEnabled())
{
_log.trace("createdBrowser " + Util.asString(session) + " browser=" + result);
}
return result;
}
/**
* Create a browser
* @param queue The queue
* @param messageSelector The message selector
* @return The browser
* @exception JMSException Thrown if an error occurs
*/
public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException
{
if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
{
throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession");
}
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createBrowser " + Util.asString(session) + " queue=" + queue + " selector=" + messageSelector);
}
QueueBrowser result = session.createBrowser(queue, messageSelector);
result = new QpidRAQueueBrowser(result, this);
addQueueBrowser(result) ;
if (_log.isTraceEnabled())
{
_log.trace("createdBrowser " + Util.asString(session) + " browser=" + result);
}
return result;
}
/**
* Create a queue
* @param queueName The queue name
* @return The queue
* @exception JMSException Thrown if an error occurs
*/
public Queue createQueue(final String queueName) throws JMSException
{
if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
{
throw new IllegalStateException("Cannot create browser or javax.jms.TopicSession");
}
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createQueue " + Util.asString(session) + " queueName=" + queueName);
}
Queue result = session.createQueue(queueName);
if (_log.isTraceEnabled())
{
_log.trace("createdQueue " + Util.asString(session) + " queue=" + result);
}
return result;
}
/**
* Create a queue receiver
* @param queue The queue
* @return The queue receiver
* @exception JMSException Thrown if an error occurs
*/
public QueueReceiver createReceiver(final Queue queue) throws JMSException
{
lock();
try
{
QueueSession session = getQueueSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createReceiver " + Util.asString(session) + " queue=" + queue);
}
QueueReceiver result = session.createReceiver(queue);
result = new QpidRAQueueReceiver(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdReceiver " + Util.asString(session) + " receiver=" + result);
}
addConsumer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Create a queue receiver
* @param queue The queue
* @param messageSelector
* @return The queue receiver
* @exception JMSException Thrown if an error occurs
*/
public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException
{
lock();
try
{
QueueSession session = getQueueSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createReceiver " + Util.asString(session) + " queue=" + queue + " selector=" + messageSelector);
}
QueueReceiver result = session.createReceiver(queue, messageSelector);
result = new QpidRAQueueReceiver(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdReceiver " + Util.asString(session) + " receiver=" + result);
}
addConsumer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Create a queue sender
* @param queue The queue
* @return The queue sender
* @exception JMSException Thrown if an error occurs
*/
public QueueSender createSender(final Queue queue) throws JMSException
{
lock();
try
{
QueueSession session = getQueueSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createSender " + Util.asString(session) + " queue=" + queue);
}
QueueSender result = session.createSender(queue);
result = new QpidRAQueueSender(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdSender " + Util.asString(session) + " sender=" + result);
}
addProducer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Create a temporary queue
* @return The temporary queue
* @exception JMSException Thrown if an error occurs
*/
public TemporaryQueue createTemporaryQueue() throws JMSException
{
if (_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION || _cri.getType() == QpidRAConnectionFactory.XA_TOPIC_CONNECTION)
{
throw new IllegalStateException("Cannot create temporary queue for javax.jms.TopicSession");
}
lock();
try
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createTemporaryQueue " + Util.asString(session));
}
TemporaryQueue temp = session.createTemporaryQueue();
if (_log.isTraceEnabled())
{
_log.trace("createdTemporaryQueue " + Util.asString(session) + " temp=" + temp);
}
_sf.addTemporaryQueue(temp);
return temp;
}
finally
{
unlock();
}
}
/**
* Create a message consumer
* @param destination The destination
* @return The message consumer
* @exception JMSException Thrown if an error occurs
*/
public MessageConsumer createConsumer(final Destination destination) throws JMSException
{
lock();
try
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createConsumer " + Util.asString(session) + " dest=" + destination);
}
MessageConsumer result = session.createConsumer(destination);
result = new QpidRAMessageConsumer(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
}
addConsumer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Create a message consumer
* @param destination The destination
* @param messageSelector The message selector
* @return The message consumer
* @exception JMSException Thrown if an error occurs
*/
public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException
{
lock();
try
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createConsumer " + Util.asString(session) +
" dest=" +
destination +
" messageSelector=" +
messageSelector);
}
MessageConsumer result = session.createConsumer(destination, messageSelector);
result = new QpidRAMessageConsumer(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
}
addConsumer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Create a message consumer
* @param destination The destination
* @param messageSelector The message selector
* @param noLocal If true inhibits the delivery of messages published by its own connection
* @return The message consumer
* @exception JMSException Thrown if an error occurs
*/
public MessageConsumer createConsumer(final Destination destination,
final String messageSelector,
final boolean noLocal) throws JMSException
{
lock();
try
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createConsumer " + Util.asString(session) +
" dest=" +
destination +
" messageSelector=" +
messageSelector +
" noLocal=" +
noLocal);
}
MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
result = new QpidRAMessageConsumer(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdConsumer " + Util.asString(session) + " consumer=" + result);
}
addConsumer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Create a message producer
* @param destination The destination
* @return The message producer
* @exception JMSException Thrown if an error occurs
*/
public MessageProducer createProducer(final Destination destination) throws JMSException
{
lock();
try
{
Session session = getSessionInternal();
if (_log.isTraceEnabled())
{
_log.trace("createProducer " + Util.asString(session) + " dest=" + destination);
}
MessageProducer result = session.createProducer(destination);
result = new QpidRAMessageProducer(result, this);
if (_log.isTraceEnabled())
{
_log.trace("createdProducer " + Util.asString(session) + " producer=" + result);
}
addProducer(result);
return result;
}
finally
{
unlock();
}
}
/**
* Get the acknowledge mode
* @return The mode
* @exception JMSException Thrown if an error occurs
*/
public int getAcknowledgeMode() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("getAcknowledgeMode()");
}
getSessionInternal();
return _cri.getAcknowledgeMode();
}
/**
* Get the XA resource
* @return The XA resource
*/
public XAResource getXAResource()
{
if (_log.isTraceEnabled())
{
_log.trace("getXAResource()");
}
if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
{
return null;
}
try
{
lock();
return getXAResourceInternal();
}
catch (Throwable t)
{
return null;
}
finally
{
unlock();
}
}
/**
* Get the session
* @return The session
* @exception JMSException Thrown if an error occurs
*/
public Session getSession() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("getSession()");
}
if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
{
throw new IllegalStateException("Non XA connection");
}
lock();
try
{
return this;
}
finally
{
unlock();
}
}
/**
* Get the queue session
* @return The queue session
* @exception JMSException Thrown if an error occurs
*/
public QueueSession getQueueSession() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("getQueueSession()");
}
if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
{
throw new IllegalStateException("Non XA connection");
}
lock();
try
{
return this;
}
finally
{
unlock();
}
}
/**
* Get the topic session
* @return The topic session
* @exception JMSException Thrown if an error occurs
*/
public TopicSession getTopicSession() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("getTopicSession()");
}
if (_cri.getType() == QpidRAConnectionFactory.CONNECTION || _cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION ||
_cri.getType() == QpidRAConnectionFactory.TOPIC_CONNECTION)
{
throw new IllegalStateException("Non XA connection");
}
lock();
try
{
return this;
}
finally
{
unlock();
}
}
/**
* Set the managed connection
* @param managedConnection The managed connection
*/
void setManagedConnection(final QpidRAManagedConnection managedConnection)
{
if (_log.isTraceEnabled())
{
_log.trace("setManagedConnection(" + managedConnection + ")");
}
final QpidRAManagedConnection mc = this._mc;
if (mc != null)
{
mc.removeHandle(this);
}
this._mc = managedConnection;
}
/** for tests only */
public ManagedConnection getManagedConnection()
{
return _mc;
}
/**
* Destroy
*/
void destroy()
{
if (_log.isTraceEnabled())
{
_log.trace("destroy()");
}
_mc = null;
}
/**
* Start
* @exception JMSException Thrown if an error occurs
*/
public void start() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("start()");
}
final QpidRAManagedConnection mc = this._mc;
if (mc != null)
{
_started.set(true) ;
mc.start();
}
}
/**
* Stop
* @exception JMSException Thrown if an error occurs
*/
void stop() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("stop()");
}
final QpidRAManagedConnection mc = this._mc;
if (mc != null)
{
mc.stop();
_started.set(false) ;
}
}
/**
* Check strict
* @exception JMSException Thrown if an error occurs
*/
void checkStrict() throws JMSException
{
if (_log.isTraceEnabled())
{
_log.trace("checkStrict()");
}
if (_mc != null)
{
throw new IllegalStateException(QpidRASessionFactory.ISE);
}
}
/**
* Close session
* @exception JMSException Thrown if an error occurs
*/
public void closeSession() throws JMSException
{
final QpidRAManagedConnection mc = this._mc;
if (mc != null)
{
_log.trace("Closing session");
try
{
mc.stop();
}
catch (Throwable t)
{
_log.trace("Error stopping managed connection", t);
}
synchronized (_consumers)
{
for (Iterator<MessageConsumer> i = _consumers.iterator(); i.hasNext();)
{
QpidRAMessageConsumer consumer = (QpidRAMessageConsumer)i.next();
try
{
consumer.closeConsumer();
}
catch (Throwable t)
{
_log.trace("Error closing consumer", t);
}
i.remove();
}
}
synchronized (_producers)
{
for (Iterator<MessageProducer> i = _producers.iterator(); i.hasNext();)
{
QpidRAMessageProducer producer = (QpidRAMessageProducer)i.next();
try
{
producer.closeProducer();
}
catch (Throwable t)
{
_log.trace("Error closing producer", t);
}
i.remove();
}
}
synchronized (_browsers)
{
for (Iterator<QueueBrowser> i = _browsers.iterator(); i.hasNext();)
{
QpidRAQueueBrowser browser = (QpidRAQueueBrowser)i.next();
try
{
browser.close();
}
catch (Throwable t)
{
_log.trace("Error closing browser", t);
}
i.remove();
}
}
mc.removeHandle(this);
ConnectionEvent ev = new ConnectionEvent(mc, ConnectionEvent.CONNECTION_CLOSED);
ev.setConnectionHandle(this);
mc.sendEvent(ev);
this._mc = null;
}
}
/**
* Add consumer
* @param consumer The consumer
*/
void addConsumer(final MessageConsumer consumer)
{
if (_log.isTraceEnabled())
{
_log.trace("addConsumer(" + consumer + ")");
}
synchronized (_consumers)
{
_consumers.add(consumer);
}
}
/**
* Remove consumer
* @param consumer The consumer
*/
void removeConsumer(final MessageConsumer consumer)
{
if (_log.isTraceEnabled())
{
_log.trace("removeConsumer(" + consumer + ")");
}
synchronized (_consumers)
{
_consumers.remove(consumer);
}
}
/**
* Add producer
* @param producer The producer
*/
void addProducer(final MessageProducer producer)
{
if (_log.isTraceEnabled())
{
_log.trace("addProducer(" + producer + ")");
}
synchronized (_producers)
{
_producers.add(producer);
}
}
/**
* Remove producer
* @param producer The producer
*/
void removeProducer(final MessageProducer producer)
{
if (_log.isTraceEnabled())
{
_log.trace("removeProducer(" + producer + ")");
}
synchronized (_producers)
{
_producers.remove(producer);
}
}
/**
* Add queue browser
* @param browser The queue browser
*/
void addQueueBrowser(final QueueBrowser browser)
{
if (_log.isTraceEnabled())
{
_log.trace("addQueueBrowser(" + browser + ")");
}
synchronized (_browsers)
{
_browsers.add(browser);
}
}
/**
* Remove queue browser
* @param browser The queue browser
*/
void removeQueueBrowser(final QueueBrowser browser)
{
if (_log.isTraceEnabled())
{
_log.trace("removeQueueBrowser(" + browser + ")");
}
synchronized (_browsers)
{
_browsers.remove(browser);
}
}
/**
* Get the session and ensure that it is open
* @return The session
* @exception JMSException Thrown if an error occurs
* @exception IllegalStateException The session is closed
*/
Session getSessionInternal() throws JMSException
{
final QpidRAManagedConnection mc = this._mc;
if (mc == null)
{
throw new IllegalStateException("The session is closed");
}
Session session = mc.getSession();
if (_log.isTraceEnabled())
{
_log.trace("getSessionInternal " + Util.asString(session) + " for " + this);
}
return session;
}
/**
* Get the XA resource and ensure that it is open
* @return The XA Resource
* @exception JMSException Thrown if an error occurs
* @exception IllegalStateException The session is closed
*/
XAResource getXAResourceInternal() throws JMSException
{
final QpidRAManagedConnection mc = this._mc;
if (mc == null)
{
throw new IllegalStateException("The session is closed");
}
try
{
XAResource xares = mc.getXAResource();
if (_log.isTraceEnabled())
{
_log.trace("getXAResourceInternal " + xares + " for " + this);
}
return xares;
}
catch (ResourceException e)
{
JMSException jmse = new JMSException("Unable to get XA Resource");
jmse.initCause(e);
throw jmse;
}
}
/**
* Get the queue session
* @return The queue session
* @exception JMSException Thrown if an error occurs
* @exception IllegalStateException The session is closed
*/
QueueSession getQueueSessionInternal() throws JMSException
{
Session s = getSessionInternal();
if (!(s instanceof QueueSession))
{
throw new InvalidDestinationException("Attempting to use QueueSession methods on: " + this);
}
return (QueueSession)s;
}
/**
* Get the topic session
* @return The topic session
* @exception JMSException Thrown if an error occurs
* @exception IllegalStateException The session is closed
*/
TopicSession getTopicSessionInternal() throws JMSException
{
Session s = getSessionInternal();
if (!(s instanceof TopicSession))
{
throw new InvalidDestinationException("Attempting to use TopicSession methods on: " + this);
}
return (TopicSession)s;
}
/**
* throws SystemException
* throws RollbackException
*
*/
public void checkState() throws JMSException
{
final QpidRAManagedConnection mc = this._mc;
if (mc != null)
{
mc.checkTransactionActive();
}
}
/**
* Has this session been started?
* @return true if started, false if stopped.
*/
public boolean isStarted()
{
return _started.get();
}
}