blob: 66a642cabec82ddab6df6a12981ff93910a904f7 [file] [log] [blame]
/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.axis2.transport.jms;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.transport.base.AbstractTransportListenerEx;
import org.apache.axis2.transport.base.BaseConstants;
import org.apache.axis2.transport.base.ManagementSupport;
import org.apache.axis2.transport.base.event.TransportErrorListener;
import org.apache.axis2.transport.base.event.TransportErrorSource;
import org.apache.axis2.transport.base.event.TransportErrorSourceSupport;
/**
* The revamped JMS Transport listener implementation. Creates {@link ServiceTaskManager} instances
* for each service requesting exposure over JMS, and stops these if they are undeployed / stopped.
* <p>
* A service indicates a JMS Connection factory definition by name, which would be defined in the
* JMSListner on the axis2.xml, and this provides a way to reuse common configuration between
* services, as well as to optimize resources utilized
* <p>
* If the connection factory name was not specified, it will default to the one named "default"
* {@see JMSConstants.DEFAULT_CONFAC_NAME}
* <p>
* If a destination JNDI name is not specified, a service will expect to use a Queue with the same
* JNDI name as of the service. Additional Parameters allows one to bind to a Topic or specify
* many more detailed control options. See package documentation for more details
* <p>
* All Destinations / JMS Administered objects used MUST be pre-created or already available
*/
public class JMSListener extends AbstractTransportListenerEx<JMSEndpoint> implements ManagementSupport,
TransportErrorSource {
public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
/** The JMSConnectionFactoryManager which centralizes the management of defined factories */
private JMSConnectionFactoryManager connFacManager;
private final TransportErrorSourceSupport tess = new TransportErrorSourceSupport(this);
@Override
protected void doInit() throws AxisFault {
connFacManager = new JMSConnectionFactoryManager(getTransportInDescription());
log.info("JMS Transport Receiver/Listener initialized...");
}
@Override
protected JMSEndpoint createEndpoint() {
return new JMSEndpoint(this, workerPool);
}
/**
* Listen for JMS messages on behalf of the given service
*
* @param service the Axis service for which to listen for messages
*/
@Override
protected void startEndpoint(JMSEndpoint endpoint) throws AxisFault {
ServiceTaskManager stm = endpoint.getServiceTaskManager();
stm.start();
for (int i=0; i<3; i++) {
// Check the consumer count rather than the active task count. Reason: if the
// destination is of type topic, then the transport is only ready to receive
// messages if at least one consumer exists. This is of not much importance,
// except for automated tests.
if (stm.getConsumerCount() > 0) {
log.info("Started to listen on destination : " + stm.getDestinationJNDIName() +
" of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
" for service " + stm.getServiceName());
return;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ignore) {}
}
log.warn("Polling tasks on destination : " + stm.getDestinationJNDIName() +
" of type " + JMSUtils.getDestinationTypeAsString(stm.getDestinationType()) +
" for service " + stm.getServiceName() + " have not yet started after 3 seconds ..");
}
/**
* Stops listening for messages for the service thats undeployed or stopped
*
* @param service the service that was undeployed or stopped
*/
@Override
protected void stopEndpoint(JMSEndpoint endpoint) {
ServiceTaskManager stm = endpoint.getServiceTaskManager();
if (log.isDebugEnabled()) {
log.debug("Stopping listening on destination : " + stm.getDestinationJNDIName() +
" for service : " + stm.getServiceName());
}
stm.stop();
log.info("Stopped listening for JMS messages to service : " + endpoint.getServiceName());
}
/**
* Return the connection factory name for this service. If this service
* refers to an invalid factory or defaults to a non-existent default
* factory, this returns null
*
* @param service the AxisService
* @return the JMSConnectionFactory to be used, or null if reference is invalid
*/
public JMSConnectionFactory getConnectionFactory(AxisService service) {
Parameter conFacParam = service.getParameter(JMSConstants.PARAM_JMS_CONFAC);
// validate connection factory name (specified or default)
if (conFacParam != null) {
return connFacManager.getJMSConnectionFactory((String) conFacParam.getValue());
} else {
return connFacManager.getJMSConnectionFactory(JMSConstants.DEFAULT_CONFAC_NAME);
}
}
// -- jmx/management methods--
/**
* Pause the listener - Stop accepting/processing new messages, but continues processing existing
* messages until they complete. This helps bring an instance into a maintenence mode
* @throws AxisFault on error
*/
@Override
public void pause() throws AxisFault {
if (state != BaseConstants.STARTED) return;
try {
for (JMSEndpoint endpoint : getEndpoints()) {
endpoint.getServiceTaskManager().pause();
}
state = BaseConstants.PAUSED;
log.info("Listener paused");
} catch (AxisJMSException e) {
log.error("At least one service could not be paused", e);
}
}
/**
* Resume the lister - Brings the lister into active mode back from a paused state
* @throws AxisFault on error
*/
@Override
public void resume() throws AxisFault {
if (state != BaseConstants.PAUSED) return;
try {
for (JMSEndpoint endpoint : getEndpoints()) {
endpoint.getServiceTaskManager().resume();
}
state = BaseConstants.STARTED;
log.info("Listener resumed");
} catch (AxisJMSException e) {
log.error("At least one service could not be resumed", e);
}
}
/**
* Stop processing new messages, and wait the specified maximum time for in-flight
* requests to complete before a controlled shutdown for maintenence
*
* @param millis a number of milliseconds to wait until pending requests are allowed to complete
* @throws AxisFault on error
*/
@Override
public void maintenenceShutdown(long millis) throws AxisFault {
if (state != BaseConstants.STARTED) return;
try {
long start = System.currentTimeMillis();
stop();
state = BaseConstants.STOPPED;
log.info("Listener shutdown in : " + (System.currentTimeMillis() - start) / 1000 + "s");
} catch (Exception e) {
handleException("Error shutting down the listener for maintenence", e);
}
}
public void addErrorListener(TransportErrorListener listener) {
tess.addErrorListener(listener);
}
public void removeErrorListener(TransportErrorListener listener) {
tess.removeErrorListener(listener);
}
void error(AxisService service, Throwable ex) {
tess.error(service, ex);
}
}