| /* |
| * Copyright 2004,2005 The Apache Software Foundation. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.axis2.transport.jms; |
| |
| import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; |
| import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; |
| import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; |
| import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; |
| import org.apache.axiom.om.OMElement; |
| import org.apache.axis2.AxisFault; |
| import org.apache.axis2.addressing.EndpointReference; |
| import org.apache.axis2.context.ConfigurationContext; |
| import org.apache.axis2.description.*; |
| import org.apache.axis2.engine.AxisConfiguration; |
| import org.apache.axis2.engine.AxisEvent; |
| import org.apache.axis2.engine.AxisObserver; |
| import org.apache.axis2.transport.TransportListener; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| import javax.jms.JMSException; |
| import javax.naming.Context; |
| import javax.naming.NamingException; |
| import java.util.*; |
| |
| /** |
| * The JMS Transport listener implementation. A JMS Listner will hold one or |
| * more JMS connection factories, which would be created at initialization |
| * time. This implementation does not support the creation of connection |
| * factories at runtime. This JMS Listener registers with Axis to be notified |
| * of service deployment/undeployment/start and stop, and enables or disables |
| * listening for messages on the destinations as appropriate. |
| * <p/> |
| * A Service could state the JMS connection factory name and the destination |
| * name for use as Parameters in its services.xml as shown in the example |
| * below. If the connection name was not specified, it will use the connection |
| * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a |
| * factory is defined in the Axis2.xml. If the destination name is not specified |
| * it will default to a JMS queue by the name of the service. If the destination |
| * should be a Topic, it should be created on the JMS implementation, and |
| * specified in the services.xml of the service. |
| * <p/> |
| * <parameter name="transport.jms.ConnectionFactory" locked="true"> |
| * myTopicConnectionFactory</parameter> |
| * <parameter name="transport.jms.Destination" locked="true"> |
| * dynamicTopics/something.TestTopic</parameter> |
| */ |
| public class JMSListener implements TransportListener { |
| |
| private static final Log log = LogFactory.getLog(JMSListener.class); |
| |
| /** |
| * The maximum number of threads used for the worker thread pool |
| */ |
| private static final int WORKERS_MAX_THREADS = 100; |
| /** |
| * The keep alive time of an idle worker thread |
| */ |
| private static final long WORKER_KEEP_ALIVE = 60L; |
| /** |
| * The worker thread timeout time unit |
| */ |
| private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; |
| |
| /** |
| * A Map containing the connection factories managed by this, keyed by name |
| */ |
| private Map connectionFactories = new HashMap(); |
| /** |
| * A Map of service name to the JMS EPR addresses |
| */ |
| private Map serviceNameToEprMap = new HashMap(); |
| /** |
| * The Axis2 Configuration context |
| */ |
| private ConfigurationContext axisConf = null; |
| |
| /** |
| * This is the TransportListener initialization method invoked by Axis2 |
| * |
| * @param axisConf the Axis configuration context |
| * @param transprtIn the TransportIn description |
| */ |
| public void init(ConfigurationContext axisConf, |
| TransportInDescription transprtIn) { |
| |
| // save reference to the configuration context |
| this.axisConf = axisConf; |
| |
| // initialize the defined connection factories |
| initializeConnectionFactories(transprtIn); |
| |
| // if no connection factories are defined, we cannot listen |
| if (connectionFactories.isEmpty()) { |
| log.warn("No JMS connection factories are defined." + |
| "Will not listen for any JMS messages"); |
| return; |
| } |
| |
| // iterate through deployed services and validate connection factory |
| // names, and mark services as faulty where appropriate. |
| Iterator services = |
| axisConf.getAxisConfiguration().getServices().values().iterator(); |
| |
| while (services.hasNext()) { |
| AxisService service = (AxisService) services.next(); |
| if (JMSUtils.isJMSService(service)) { |
| processService(service); |
| } |
| } |
| |
| // register to receive updates on services for lifetime management |
| axisConf.getAxisConfiguration().addObservers(new JMSAxisObserver()); |
| |
| log.info("JMS Transport Receiver (Listener) initialized..."); |
| } |
| |
| |
| /** |
| * Prepare to listen for JMS messages on behalf of this service |
| * |
| * @param service |
| */ |
| private void processService(AxisService service) { |
| JMSConnectionFactory cf = getConnectionFactory(service); |
| if (cf == null) { |
| String msg = "Service " + service.getName() + " does not specify" + |
| "a JMS connection factory or refers to an invalid factory. " + |
| "This service is being marked as faulty and will not be " + |
| "available over the JMS transport"; |
| log.warn(msg); |
| JMSUtils.markServiceAsFaulty( |
| service.getName(), msg, service.getAxisConfiguration()); |
| return; |
| } |
| |
| String destination = JMSUtils.getDestination(service); |
| |
| // compute service EPR and keep for later use |
| serviceNameToEprMap.put(service.getName(), getEPR(cf, destination)); |
| |
| // add the specified or implicit destination of this service |
| // to its connection factory |
| cf.addDestination(destination, service.getName()); |
| } |
| |
| /** |
| * Return the connection factory name for this service. If this service |
| * refers to an invalid factory or defaults to a non-existent default |
| * factory, this returns null |
| * |
| * @param service the AxisService |
| * @return the JMSConnectionFactory to be used, or null if reference is invalid |
| */ |
| private JMSConnectionFactory getConnectionFactory(AxisService service) { |
| Parameter conFacParam = service.getParameter(JMSConstants.CONFAC_PARAM); |
| |
| // validate connection factory name (specified or default) |
| if (conFacParam != null) { |
| String conFac = (String) conFacParam.getValue(); |
| if (connectionFactories.containsKey(conFac)) { |
| return (JMSConnectionFactory) connectionFactories.get(conFac); |
| } else { |
| return null; |
| } |
| |
| } else if (connectionFactories.containsKey(JMSConstants.DEFAULT_CONFAC_NAME)) { |
| return (JMSConnectionFactory) connectionFactories. |
| get(JMSConstants.DEFAULT_CONFAC_NAME); |
| |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Initialize the defined connection factories, parsing the TransportIn |
| * descriptions |
| * |
| * @param transprtIn The Axis2 Transport in for the JMS |
| */ |
| private void initializeConnectionFactories(TransportInDescription transprtIn) { |
| // iterate through all defined connection factories |
| Iterator conFacIter = transprtIn.getParameters().iterator(); |
| |
| while (conFacIter.hasNext()) { |
| |
| Parameter param = (Parameter) conFacIter.next(); |
| JMSConnectionFactory jmsConFactory = |
| new JMSConnectionFactory(param.getName()); |
| |
| ParameterIncludeImpl pi = new ParameterIncludeImpl(); |
| try { |
| pi.deserializeParameters((OMElement) param.getValue()); |
| } catch (AxisFault axisFault) { |
| handleException("Error reading Parameters for JMS connection " + |
| "factory" + jmsConFactory.getName(), axisFault); |
| } |
| |
| // read connection facotry properties |
| Iterator params = pi.getParameters().iterator(); |
| |
| while (params.hasNext()) { |
| Parameter p = (Parameter) params.next(); |
| |
| if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) { |
| jmsConFactory.addProperty( |
| Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue()); |
| } else if (Context.PROVIDER_URL.equals(p.getName())) { |
| jmsConFactory.addProperty( |
| Context.PROVIDER_URL, (String) p.getValue()); |
| } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) { |
| jmsConFactory.addProperty( |
| Context.SECURITY_PRINCIPAL, (String) p.getValue()); |
| } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) { |
| jmsConFactory.addProperty( |
| Context.SECURITY_CREDENTIALS, (String) p.getValue()); |
| } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) { |
| jmsConFactory.setJndiName((String) p.getValue()); |
| } else if (JMSConstants.DEST_PARAM.equals(p.getName())) { |
| StringTokenizer st = |
| new StringTokenizer((String) p.getValue(), " ,"); |
| while (st.hasMoreTokens()) { |
| jmsConFactory.addDestination(st.nextToken(), null); |
| } |
| } |
| } |
| |
| // connect to the actual connection factory |
| try { |
| jmsConFactory.connect(); |
| connectionFactories.put(jmsConFactory.getName(), jmsConFactory); |
| } catch (NamingException e) { |
| handleException("Error connecting to JMS connection factory : " + |
| jmsConFactory.getJndiName(), e); |
| } |
| } |
| } |
| |
| /** |
| * Get the EPR for the given JMS connection factory and destination |
| * the form of the URL is |
| * jms:/<destination>?[<key>=<value>&]* |
| * |
| * @param cf the Axis2 JMS connection factory |
| * @param destination the JNDI name of the destination |
| * @return the EPR as a String |
| */ |
| private static String getEPR(JMSConnectionFactory cf, String destination) { |
| StringBuffer sb = new StringBuffer(); |
| sb.append(JMSConstants.JMS_PREFIX).append(destination); |
| sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM). |
| append("=").append(cf.getJndiName()); |
| Iterator props = cf.getProperties().keySet().iterator(); |
| while (props.hasNext()) { |
| String key = (String) props.next(); |
| String value = (String) cf.getProperties().get(key); |
| sb.append("&").append(key).append("=").append(value); |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * Start this JMS Listener (Transport Listener) |
| * |
| * @throws AxisFault |
| */ |
| public void start() throws AxisFault { |
| // create thread pool of workers |
| ExecutorService workerPool = new ThreadPoolExecutor( |
| 1, |
| WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT, |
| new LinkedBlockingQueue(), |
| new org.apache.axis2.util.threadpool.DefaultThreadFactory( |
| new ThreadGroup("JMS Worker thread group"), |
| "JMSWorker")); |
| |
| Iterator iter = connectionFactories.values().iterator(); |
| while (iter.hasNext()) { |
| JMSConnectionFactory conFac = (JMSConnectionFactory) iter.next(); |
| JMSMessageReceiver msgRcvr = |
| new JMSMessageReceiver(conFac, workerPool, axisConf); |
| |
| try { |
| conFac.listen(msgRcvr); |
| } catch (JMSException e) { |
| handleException("Error starting connection factory : " + |
| conFac.getName(), e); |
| } |
| } |
| } |
| |
| /** |
| * Stop this transport listener and shutdown all of the connection factories |
| */ |
| public void stop() { |
| Iterator iter = connectionFactories.values().iterator(); |
| while (iter.hasNext()) { |
| ((JMSConnectionFactory) iter.next()).stop(); |
| } |
| } |
| |
| /** |
| * Returns EPRs for the given service and IP. (Picks up precomputed EPR) |
| * |
| * @param serviceName service name |
| * @param ip ignored |
| * @return the EPR for the service |
| * @throws AxisFault not used |
| */ |
| public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { |
| //Strip out the operation name |
| if (serviceName.indexOf('/') != -1) { |
| serviceName = serviceName.substring(0, serviceName.indexOf('/')); |
| } |
| return new EndpointReference[]{new EndpointReference((String) serviceNameToEprMap.get(serviceName))}; |
| } |
| |
| /** |
| * Returns the EPR for the given service and IP. (Picks up precomputed EPR) |
| * |
| * @param serviceName service name |
| * @param ip ignored |
| * @return the EPR for the service |
| * @throws AxisFault not used |
| */ |
| public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault { |
| return getEPRsForService(serviceName, ip)[0]; |
| } |
| |
| /** |
| * Starts listening for messages on this service |
| * |
| * @param service the AxisService just deployed |
| */ |
| private void startListeningForService(AxisService service) { |
| processService(service); |
| JMSConnectionFactory cf = getConnectionFactory(service); |
| if (cf == null) { |
| String msg = "Service " + service.getName() + " does not specify" + |
| "a JMS connection factory or refers to an invalid factory." + |
| "This service is being marked as faulty and will not be " + |
| "available over the JMS transport"; |
| log.warn(msg); |
| JMSUtils.markServiceAsFaulty( |
| service.getName(), msg, service.getAxisConfiguration()); |
| return; |
| } |
| |
| String destination = JMSUtils.getDestination(service); |
| try { |
| cf.listenOnDestination(destination); |
| log.info("Started listening on destination : " + destination + |
| " for service " + service.getName()); |
| |
| } catch (JMSException e) { |
| handleException( |
| "Could not listen on JMS for service " + service.getName(), e); |
| JMSUtils.markServiceAsFaulty( |
| service.getName(), e.getMessage(), service.getAxisConfiguration()); |
| } |
| } |
| |
| /** |
| * Stops listening for messages for the service undeployed |
| * |
| * @param service the AxisService just undeployed |
| */ |
| private void stopListeningForService(AxisService service) { |
| |
| JMSConnectionFactory cf = getConnectionFactory(service); |
| if (cf == null) { |
| String msg = "Service " + service.getName() + " does not specify" + |
| "a JMS connection factory or refers to an invalid factory." + |
| "This service is being marked as faulty and will not be " + |
| "available over the JMS transport"; |
| log.warn(msg); |
| JMSUtils.markServiceAsFaulty( |
| service.getName(), msg, service.getAxisConfiguration()); |
| return; |
| } |
| |
| // remove from the serviceNameToEprMap |
| serviceNameToEprMap.remove(service.getName()); |
| |
| String destination = JMSUtils.getDestination(service); |
| try { |
| cf.removeDestination(destination); |
| } catch (JMSException e) { |
| handleException( |
| "Error while terminating listening on JMS destination : " + destination, e); |
| } |
| } |
| |
| private void handleException(String msg, Exception e) { |
| log.error(msg, e); |
| throw new AxisJMSException(msg, e); |
| } |
| |
| /** |
| * An AxisObserver which will start listening for newly deployed services, |
| * and stop listening when services are undeployed. |
| */ |
| class JMSAxisObserver implements AxisObserver { |
| |
| // The initilization code will go here |
| public void init(AxisConfiguration axisConfig) { |
| } |
| |
| public void serviceUpdate(AxisEvent event, AxisService service) { |
| |
| if (JMSUtils.isJMSService(service)) { |
| switch (event.getEventType()) { |
| case AxisEvent.SERVICE_DEPLOY : |
| startListeningForService(service); |
| break; |
| case AxisEvent.SERVICE_REMOVE : |
| stopListeningForService(service); |
| break; |
| case AxisEvent.SERVICE_START : |
| startListeningForService(service); |
| break; |
| case AxisEvent.SERVICE_STOP : |
| stopListeningForService(service); |
| break; |
| } |
| } |
| } |
| |
| public void moduleUpdate(AxisEvent event, AxisModule module) { |
| } |
| |
| //-------------------------------------------------------- |
| public void addParameter(Parameter param) throws AxisFault { |
| } |
| |
| public void removeParameter(Parameter param) throws AxisFault { |
| } |
| |
| public void deserializeParameters(OMElement parameterElement) throws AxisFault { |
| } |
| |
| public Parameter getParameter(String name) { |
| return null; |
| } |
| |
| public ArrayList getParameters() { |
| return null; |
| } |
| |
| public boolean isParameterLocked(String parameterName) { |
| return false; |
| } |
| |
| public void serviceGroupUpdate(AxisEvent event, AxisServiceGroup serviceGroup) { |
| } |
| } |
| |
| public ConfigurationContext getConfigurationContext() { |
| return this.axisConf; |
| } |
| } |