/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.tuscany.sca.binding.ws.axis2.jms; | |
import java.util.HashMap; | |
import java.util.Hashtable; | |
import java.util.Iterator; | |
import java.util.Map; | |
import javax.jms.Connection; | |
import javax.jms.ConnectionFactory; | |
import javax.jms.Destination; | |
import javax.jms.JMSException; | |
import javax.jms.Message; | |
import javax.jms.MessageConsumer; | |
import javax.jms.Queue; | |
import javax.jms.Session; | |
import javax.jms.Topic; | |
import javax.naming.Context; | |
import javax.naming.InitialContext; | |
import javax.naming.NameNotFoundException; | |
import javax.naming.NamingException; | |
import org.apache.axis2.addressing.EndpointReference; | |
import org.apache.axis2.transport.jms.JMSConstants; | |
import org.apache.axis2.transport.jms.JMSUtils; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.tuscany.sca.work.WorkScheduler; | |
/** | |
* Encapsulate a JMS Connection factory definition within an Axis2.xml | |
* <p/> | |
* More than one JMS connection factory could be defined within an Axis2 XML | |
* specifying the JMSListener as the transportReceiver. | |
* <p/> | |
* These connection factories are created at the initialization of the | |
* transportReceiver, and any service interested in using any of these could | |
* specify the name of the factory and the destination through Parameters named | |
* JMSConstants.CONFAC_PARAM and JMSConstants.DEST_PARAM as shown below. | |
* <p/> | |
* <parameter name="transport.jms.ConnectionFactory" locked="true">myQueueConnectionFactory</parameter> | |
* <parameter name="transport.jms.Destination" locked="true">TestQueue</parameter> | |
* <p/> | |
* If a connection factory is defined by a parameter named | |
* JMSConstants.DEFAULT_CONFAC_NAME in the Axis2 XML, services which does not | |
* explicitly specify a connection factory will be defaulted to it - if it is | |
* defined in the Axis2 configuration. | |
* <p/> | |
* e.g. | |
* <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"> | |
* <parameter name="myTopicConnectionFactory" locked="false"> | |
* <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> | |
* <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter> | |
* <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter> | |
* <parameter name="transport.jms.Destination" locked="false">myTopicOne, myTopicTwo</parameter> | |
* </parameter> | |
* <parameter name="myQueueConnectionFactory" locked="false"> | |
* <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> | |
* <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter> | |
* <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter> | |
* <parameter name="transport.jms.Destination" locked="false">myQueueOne, myQueueTwo</parameter> | |
* </parameter> | |
* <parameter name="default" locked="false"> | |
* <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> | |
* <parameter name="java.naming.provider.url" locked="false">tcp://localhost:61616</parameter> | |
* <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">ConnectionFactory</parameter> | |
* <parameter name="transport.jms.Destination" locked="false">myDestinationOne, myDestinationTwo</parameter> | |
* </parameter> | |
* </transportReceiver> | |
*/ | |
public class JMSConnectionFactory { | |
private static final Log log = LogFactory.getLog(JMSConnectionFactory.class); | |
/** | |
* The name used for the connection factory definition within Axis2 | |
*/ | |
private String name = null; | |
/** | |
* The JNDI name of the actual connection factory | |
*/ | |
private String jndiName = null; | |
/** | |
* The JNDI name of the actual connection factory username | |
*/ | |
private String jndiUser = null; | |
/** | |
* The JNDI name of the actual connection factory password | |
*/ | |
private String jndiPass = null; | |
/** | |
* Map of destination JNDI names to service names | |
*/ | |
private Map serviceJNDINameMapping = null; | |
/** | |
* Map of destinations to service names | |
*/ | |
private Map serviceDestinationMapping = null; | |
/** | |
* The JMS Sessions listening for messages | |
*/ | |
private Map jmsSessions = null; | |
/** | |
* Properties of the connection factory | |
*/ | |
private Hashtable properties = null; | |
/** | |
* The JNDI Context used | |
*/ | |
private Context context = null; | |
/** | |
* The actual ConnectionFactory instance held within | |
*/ | |
private ConnectionFactory conFactory = null; | |
/** | |
* The JMS Connection is opened. | |
*/ | |
private Connection connection = null; | |
/** | |
* The JMS Message receiver for this connection factory | |
*/ | |
private JMSMessageReceiver msgRcvr = null; | |
/** | |
* The actual password for the connection factory after retrieval from JNDI. | |
* If this is not supplied, the OS username will be used by default | |
*/ | |
private String user = null; | |
/** | |
* The actual password for the connection factory after retrieval from JNDI. | |
* If this is not supplied, the OS credentials will be used by default | |
*/ | |
private String pass = null; | |
private WorkScheduler workScheduler; | |
private boolean consumerRunning; | |
/** | |
* Create a JMSConnectionFactory for the given Axis2 name and JNDI name | |
* | |
* @param name the local Axis2 name of the connection factory | |
* @param jndiName the JNDI name of the actual connection factory used | |
*/ | |
JMSConnectionFactory(String name, String jndiName, WorkScheduler workScheduler) { | |
this.name = name; | |
this.jndiName = jndiName; | |
serviceJNDINameMapping = new HashMap(); | |
serviceDestinationMapping = new HashMap(); | |
properties = new Hashtable(); | |
jmsSessions = new HashMap(); | |
this.workScheduler = workScheduler; | |
} | |
/** | |
* Create a JMSConnectionFactory for the given Axis2 name | |
* | |
* @param name the local Axis2 name of the connection factory | |
*/ | |
JMSConnectionFactory(String name, WorkScheduler workScheduler) { | |
this(name, null, workScheduler); | |
} | |
/** | |
* Connect to the actual JMS connection factory specified by the JNDI name | |
* | |
* @throws NamingException if the connection factory cannot be found | |
*/ | |
public void connect() throws NamingException { | |
if (context == null) { | |
createInitialContext(); | |
} | |
conFactory = (ConnectionFactory) context.lookup(jndiName); | |
if (jndiUser != null) | |
user = (String ) context.lookup(jndiUser); | |
if (jndiPass != null) | |
pass = (String ) context.lookup(jndiPass); | |
log.debug("Connected to the actual connection factory : " + jndiName); | |
} | |
/** | |
* Creates the initial context using the set properties | |
* | |
* @throws NamingException | |
*/ | |
private void createInitialContext() throws NamingException { | |
context = new InitialContext(properties); | |
} | |
/** | |
* Set the JNDI connection factory name | |
* | |
* @param jndiName | |
*/ | |
public void setJndiName(String jndiName) { | |
this.jndiName = jndiName; | |
} | |
/** | |
* Get the JNDI name of the actual factory username | |
* | |
* @return the jndi name of the actual connection factory username | |
*/ | |
public void setJndiUser(String jndiUser) { | |
this.jndiUser = jndiUser; | |
} | |
/** | |
* Get the JNDI name of the actual factory password | |
* | |
* @return the jndi name of the actual connection factory password | |
*/ | |
public void setJndiPass(String jndiPass) { | |
this.jndiPass = jndiPass; | |
} | |
/** | |
* Add a listen destination on this connection factory on behalf of the given service | |
* | |
* @param destinationJndi destination JNDI name | |
* @param serviceName the service to which it belongs | |
*/ | |
public void addDestination(String destinationJndi, String serviceName) { | |
serviceJNDINameMapping.put(destinationJndi, serviceName); | |
String destinationName = getDestinationName(destinationJndi); | |
if (destinationName == null) { | |
log.warn("JMS Destination with JNDI name : " + destinationJndi + " does not exist"); | |
Connection con = null; | |
try { | |
if ((jndiUser == null) || (jndiPass == null)){ | |
// Use the OS username and credentials | |
con = conFactory.createConnection(); | |
} else{ | |
// use an explicit username and password | |
con = conFactory.createConnection(user, pass); | |
} | |
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); | |
Queue queue = session.createQueue(destinationJndi); | |
destinationName = queue.getQueueName(); | |
log.warn("JMS Destination with JNDI name : " + destinationJndi + " created"); | |
} catch (JMSException e) { | |
log.error("Unable to create a Destination with JNDI name : " + destinationJndi, e); | |
// mark service as faulty | |
JMSUtils.markServiceAsFaulty( | |
(String) serviceJNDINameMapping.get(destinationJndi), | |
"Error looking up JMS destination : " + destinationJndi, | |
msgRcvr.getAxisConf().getAxisConfiguration()); | |
} finally { | |
if (con != null) { | |
try { | |
con.close(); | |
} catch (JMSException ignore) {} | |
} | |
} | |
} | |
serviceDestinationMapping.put(destinationName, serviceName); | |
log.info("Mapping JNDI name : " + destinationJndi + " and JMS Destination name : " + | |
destinationName + " against service : " + serviceName); | |
} | |
/** | |
* Remove listen destination on this connection factory | |
* | |
* @param destinationJndi the JMS destination to be removed | |
* @throws if an error occurs trying to stop listening for messages before removal | |
*/ | |
public void removeDestination(String destinationJndi) throws JMSException { | |
// find and save provider specific Destination name before we delete | |
String providerSpecificDestination = getDestinationName(destinationJndi); | |
stoplistenOnDestination(destinationJndi); | |
serviceJNDINameMapping.remove(destinationJndi); | |
if (providerSpecificDestination != null) { | |
serviceDestinationMapping.remove(providerSpecificDestination); | |
} | |
} | |
/** | |
* Add a property to the connection factory | |
* | |
* @param key | |
* @param value | |
*/ | |
public void addProperty(String key, String value) { | |
properties.put(key, value); | |
} | |
/** | |
* Return the name of the connection factory | |
* | |
* @return the Axis2 name of this factory | |
*/ | |
public String getName() { | |
return name; | |
} | |
/** | |
* Get the JNDI name of the actual factory | |
* | |
* @return the jndi name of the actual connection factory | |
*/ | |
public String getJndiName() { | |
return jndiName; | |
} | |
/** | |
* Get the JNDI name of the actual factory username | |
* | |
* @return the jndi name of the actual connection factory username | |
*/ | |
public String getJndiUser() { | |
return jndiUser; | |
} | |
/** | |
* Get the JNDI name of the actual factory password | |
* | |
* @return the jndi name of the actual connection factory password | |
*/ | |
public String getJndiPass() { | |
return jndiPass; | |
} | |
/** | |
* This is the real password for the connection factory after the JNDI lookup | |
* | |
* @return the real password for the connection factory after the JNDI lookup | |
*/ | |
public String getPass() { | |
return pass; | |
} | |
/** | |
* This is the real username for the connection factory after the JNDI lookup | |
* | |
* @return the eal username for the connection factory after the JNDI lookup | |
*/ | |
public String getUser() { | |
return user; | |
} | |
/** | |
* Get the actual underlying connection factory | |
* | |
* @return actual connection factory | |
*/ | |
public ConnectionFactory getConFactory() { | |
return conFactory; | |
} | |
/** | |
* Get the list of destinations (JNDI) associated with this connection factory | |
* | |
* @return destinations to service maping | |
*/ | |
public Map getDestinations() { | |
return serviceJNDINameMapping; | |
} | |
/** | |
* Get the connection factory properties | |
* | |
* @return properties | |
*/ | |
public Hashtable getProperties() { | |
return properties; | |
} | |
/** | |
* Begin listening for messages on the list of destinations associated | |
* with this connection factory. (Called during Axis2 initialization of | |
* the Transport receivers) | |
* | |
* @param msgRcvr the message receiver which will process received messages | |
* @throws JMSException on exceptions | |
*/ | |
public void listen(JMSMessageReceiver msgRcvr) throws JMSException { | |
// save a reference to the message receiver | |
this.msgRcvr = msgRcvr; | |
log.debug("Connection factory : " + name + " initializing..."); | |
if (conFactory == null || context == null) { | |
handleException( | |
"Connection factory must be 'connected' before listening"); | |
} else { | |
try { | |
if ((jndiUser == null) || (jndiPass == null)){ | |
// User the OS username and credentials | |
connection = conFactory.createConnection(); | |
} else{ | |
// use an explicit username and password | |
connection = conFactory.createConnection(user, pass); | |
} | |
} catch (JMSException e) { | |
handleException("Error creating a JMS connection using the " + | |
"factory : " + jndiName, e); | |
} | |
} | |
Iterator iter = serviceJNDINameMapping.keySet().iterator(); | |
while (iter.hasNext()) { | |
String destinationJndi = (String) iter.next(); | |
listenOnDestination(destinationJndi); | |
} | |
// start the connection | |
if (!consumerRunning) { | |
connection.start(); | |
} | |
log.info("Connection factory : " + name + " initialized..."); | |
} | |
/** | |
* Listen on the given destination from this connection factory. Used to | |
* start listening on a destination associated with a newly deployed service | |
* | |
* @param destinationJndi the JMS destination to listen on | |
* @throws JMSException on exception | |
*/ | |
public void listenOnDestination(String destinationJndi) throws JMSException { | |
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | |
Destination destination = null; | |
try { | |
Object o = context.lookup(destinationJndi); | |
destination = (Destination) o; | |
} catch (NameNotFoundException e) { | |
log.warn("Cannot find destination : " + destinationJndi + | |
" Creating a Queue with this name"); | |
destination = session.createQueue(destinationJndi); | |
} catch (NamingException e) { | |
log.warn("Error looking up destination : " + destinationJndi, e); | |
// mark service as faulty | |
JMSUtils.markServiceAsFaulty( | |
(String) serviceJNDINameMapping.get(destinationJndi), | |
"Error looking up JMS destination : " + destinationJndi, | |
this.msgRcvr.getAxisConf().getAxisConfiguration()); | |
} | |
MessageConsumer consumer = session.createConsumer(destination); | |
// consumer.setMessageListener(this.msgRcvr); replace with new Tuscany method: | |
registerMessageReceiver(consumer, this.msgRcvr); | |
jmsSessions.put(destinationJndi, session); | |
} | |
private void registerMessageReceiver(final MessageConsumer consumer, final JMSMessageReceiver messageReceiver) throws JMSException { | |
try { | |
consumer.setMessageListener(messageReceiver); | |
} catch (javax.jms.JMSException e) { | |
// setMessageListener not allowed in JEE container so use Tuscany threads | |
connection.start(); | |
consumerRunning = true; | |
workScheduler.scheduleWork(new Runnable() { | |
public void run() { | |
try { | |
while (consumerRunning) { | |
final Message msg = consumer.receive(); | |
if (msg != null) { | |
workScheduler.scheduleWork(new Runnable() { | |
public void run() { | |
try { | |
messageReceiver.onMessage(msg); | |
} catch (Exception e) { | |
log.error("Exception on message receiver thread", e); | |
} | |
} | |
}); | |
} | |
} | |
} catch (Exception e) { | |
log.error("Exception on consumer receive thread", e); | |
} | |
} | |
}); | |
} | |
} | |
/** | |
* Stop listening on the given destination - for undeployment of services | |
* | |
* @param destinationJndi the JNDI name of the JMS destination | |
* @throws JMSException on exception | |
*/ | |
private void stoplistenOnDestination(String destinationJndi) throws JMSException { | |
((Session) jmsSessions.get(destinationJndi)).close(); | |
} | |
/** | |
* Return the service name using this destination | |
* | |
* @param destination the destination name | |
* @return the service which uses the given destination, or null | |
*/ | |
public String getServiceNameForDestination(String destination) { | |
return (String) serviceJNDINameMapping.get(destination); | |
} | |
/** | |
* Close all connections, sessions etc.. and stop this connection factory | |
*/ | |
public void stop() { | |
try { | |
consumerRunning = false; | |
connection.close(); | |
} catch (JMSException e) { | |
log.warn("Error shutting down connection factory : " + name, e); | |
} | |
} | |
/** | |
* Return the provider specific Destination name if any for the destination with the given | |
* JNDI name | |
* @param destinationJndi the JNDI name of the destination | |
* @return the provider specific Destination name or null if cannot be found | |
*/ | |
public String getDestinationName(String destinationJndi) { | |
try { | |
Destination destination = (Destination) context.lookup(destinationJndi); | |
if (destination != null && destination instanceof Queue) { | |
return ((Queue) destination).getQueueName(); | |
} else if (destination != null && destination instanceof Topic) { | |
return ((Topic) destination).getTopicName(); | |
} | |
} catch (JMSException e) { | |
log.warn("Error reading provider specific JMS destination name for destination " + | |
"with JNDI name : " + destinationJndi, e); | |
} catch (NamingException e) { | |
log.warn("Error looking up destination with JNDI name : " + destinationJndi + | |
" to map its corresponding provider specific Destination name"); | |
} | |
return null; | |
} | |
/** | |
* Return the EPR for the JMS Destination with the given JNDI name and using | |
* this connection factory | |
* @param destination the JNDI name of the JMS Destionation | |
* @return the EPR | |
*/ | |
public EndpointReference getEPRForDestination(String destination) { | |
StringBuffer sb = new StringBuffer(); | |
sb.append(JMSConstants.JMS_PREFIX).append(destination); | |
sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM). | |
append("=").append(getJndiName()); | |
Iterator props = getProperties().keySet().iterator(); | |
while (props.hasNext()) { | |
String key = (String) props.next(); | |
String value = (String) getProperties().get(key); | |
sb.append("&").append(key).append("=").append(value); | |
} | |
return new EndpointReference(sb.toString()); | |
} | |
public String getServiceByDestination(String destinationName) { | |
return (String) serviceDestinationMapping.get(destinationName); | |
} | |
private void handleException(String msg) throws AxisJMSException { | |
log.error(msg); | |
throw new AxisJMSException(msg); | |
} | |
private void handleException(String msg, Exception e) throws AxisJMSException { | |
log.error(msg, e); | |
throw new AxisJMSException(msg, e); | |
} | |
} |