blob: 530ed22cabd47b597a12af127fb6ed3c3e0b750c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.pool;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
* {@link QueueConnection} which is pooled and on {@link #close()} will return
* its reference to the ConnectionPool backing it.
*
* <b>NOTE</b> this implementation is only intended for use when sending
* messages. It does not deal with pooling of consumers; for that look at a
* library like <a href="http://jencks.org/">Jencks</a> such as in <a
* href="http://jencks.org/Message+Driven+POJOs">this example</a>
*
*/
public class PooledConnection implements TopicConnection, QueueConnection, PooledSessionEventListener {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
protected ConnectionPool pool;
private volatile boolean stopped;
private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
/**
* Creates a new PooledConnection instance that uses the given ConnectionPool to create
* and manage its resources. The ConnectionPool instance can be shared amongst many
* PooledConnection instances.
*
* @param pool
* The connection and pool manager backing this proxy connection object.
*/
public PooledConnection(ConnectionPool pool) {
this.pool = pool;
}
/**
* Factory method to create a new instance.
*/
public PooledConnection newInstance() {
return new PooledConnection(pool);
}
@Override
public void close() throws JMSException {
this.cleanupAllLoanedSessions();
this.cleanupConnectionTemporaryDestinations();
if (this.pool != null) {
this.pool.decrementReferenceCount();
this.pool = null;
}
}
@Override
public void start() throws JMSException {
assertNotClosed();
pool.start();
}
@Override
public void stop() throws JMSException {
stopped = true;
}
@Override
public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
}
@Override
public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
}
@Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
}
@Override
public String getClientID() throws JMSException {
return getConnection().getClientID();
}
@Override
public ExceptionListener getExceptionListener() throws JMSException {
return pool.getParentExceptionListener();
}
@Override
public ConnectionMetaData getMetaData() throws JMSException {
return getConnection().getMetaData();
}
@Override
public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
pool.setParentExceptionListener(exceptionListener);
}
@Override
public void setClientID(String clientID) throws JMSException {
// ignore repeated calls to setClientID() with the same client id
// this could happen when a JMS component such as Spring that uses a
// PooledConnectionFactory shuts down and reinitializes.
if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
getConnection().setClientID(clientID);
}
}
@Override
public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
}
// Session factory methods
// -------------------------------------------------------------------------
@Override
public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
return (QueueSession) createSession(transacted, ackMode);
}
@Override
public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
return (TopicSession) createSession(transacted, ackMode);
}
/**
* Creates a <CODE>Session</CODE> object.
*
* @throws JMSException if the <CODE>Connection</CODE> object fails to
* create a session due to some internal error or lack of
* support for the specific transaction and acknowledgement
* mode.
* @since 2.0
*/
@Override
public Session createSession() throws JMSException {
throw new UnsupportedOperationException("createSession() is unsupported");
}
/**
* Creates a <CODE>Session</CODE> object.
*
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
* session is transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created session
* @throws JMSException if the <CODE>Connection</CODE> object fails to
* create a session due to some internal error or lack of
* support for the specific transaction and acknowledgement
* mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
* @since 2.0
*/
@Override
public Session createSession(int sessionMode) throws JMSException {
throw new UnsupportedOperationException("createSession(int sessionMode) is unsupported");
}
@Override
public Session createSession(boolean transacted, int ackMode) throws JMSException {
PooledSession result = (PooledSession) pool.createSession(transacted, ackMode);
// Store the session so we can close the sessions that this PooledConnection
// created in order to ensure that consumers etc are closed per the JMS contract.
loanedSessions.add(result);
// Add a event listener to the session that notifies us when the session
// creates / destroys temporary destinations and closes etc.
result.addSessionEventListener(this);
return result;
}
/**
*
* @see javax.jms.ConnectionConsumer
* @since 2.0
*/
@Override
public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
throw new UnsupportedOperationException("createSharedConnectionConsumer() is not supported");
}
/**
*
* @see javax.jms.ConnectionConsumer
* @since 2.0
*/
@Override
public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
throw new UnsupportedOperationException("createSharedConnectionConsumer() is not supported");
}
// Implementation methods
// -------------------------------------------------------------------------
@Override
public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
connTempQueues.add(tempQueue);
}
@Override
public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
connTempTopics.add(tempTopic);
}
@Override
public void onSessionClosed(PooledSession session) {
if (session != null) {
this.loanedSessions.remove(session);
}
}
public Connection getConnection() throws JMSException {
assertNotClosed();
return pool.getConnection();
}
protected void assertNotClosed() throws javax.jms.IllegalStateException {
if (stopped || pool == null) {
throw new IllegalStateException("Connection closed");
}
}
protected Session createSession(SessionKey key) throws JMSException {
return getConnection().createSession(key.isTransacted(), key.getAckMode());
}
@Override
public String toString() {
return "PooledConnection { " + pool + " }";
}
/**
* Remove all of the temporary destinations created for this connection.
* This is important since the underlying connection may be reused over a
* long period of time, accumulating all of the temporary destinations from
* each use. However, from the perspective of the lifecycle from the
* client's view, close() closes the connection and, therefore, deletes all
* of the temporary destinations created.
*/
protected void cleanupConnectionTemporaryDestinations() {
for (TemporaryQueue tempQueue : connTempQueues) {
try {
tempQueue.delete();
} catch (JMSException ex) {
LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
}
}
connTempQueues.clear();
for (TemporaryTopic tempTopic : connTempTopics) {
try {
tempTopic.delete();
} catch (JMSException ex) {
LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
}
}
connTempTopics.clear();
}
/**
* The PooledSession tracks all Sessions that it created and now we close them. Closing the
* PooledSession will return the internal Session to the Pool of Session after cleaning up
* all the resources that the Session had allocated for this PooledConnection.
*/
protected void cleanupAllLoanedSessions() {
for (PooledSession session : loanedSessions) {
try {
session.close();
} catch (JMSException ex) {
LOG.info("failed to close loaned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage());
}
}
loanedSessions.clear();
}
/**
* @return the total number of Pooled session including idle sessions that are not
* currently loaned out to any client.
*/
public int getNumSessions() {
return this.pool.getNumSessions();
}
/**
* @return the number of Sessions that are currently checked out of this Connection's session pool.
*/
public int getNumActiveSessions() {
return this.pool.getNumActiveSessions();
}
/**
* @return the number of Sessions that are idle in this Connection's sessions pool.
*/
public int getNumtIdleSessions() {
return this.pool.getNumIdleSessions();
}
}