blob: 28e6c64fdf4013b34d21b096ff9e6a94bc1d128e [file] [log] [blame]
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*******************************************************************************/
package org.ofbiz.service.jms;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.XAQueueConnection;
import javax.jms.XAQueueConnectionFactory;
import javax.jms.XAQueueSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.transaction.xa.XAResource;
import org.ofbiz.base.config.GenericConfigException;
import org.ofbiz.base.util.Debug;
import org.ofbiz.base.util.GeneralException;
import org.ofbiz.base.util.JNDIContextFactory;
import org.ofbiz.base.util.UtilValidate;
import org.ofbiz.base.util.UtilXml;
import org.ofbiz.entity.transaction.GenericTransactionException;
import org.ofbiz.entity.transaction.TransactionUtil;
import org.ofbiz.service.GenericRequester;
import org.ofbiz.service.GenericServiceException;
import org.ofbiz.service.ModelService;
import org.ofbiz.service.ServiceDispatcher;
import org.ofbiz.service.ServiceUtil;
import org.ofbiz.service.config.ServiceConfigUtil;
import org.ofbiz.service.config.model.JmsService;
import org.ofbiz.service.config.model.Server;
import org.ofbiz.service.engine.AbstractEngine;
import org.w3c.dom.Element;
/**
* AbstractJMSEngine
*/
public class JmsServiceEngine extends AbstractEngine {
public static final String module = JmsServiceEngine.class.getName();
public JmsServiceEngine(ServiceDispatcher dispatcher) {
super(dispatcher);
}
protected JmsService getServiceElement(ModelService modelService) throws GenericServiceException {
String location = this.getLocation(modelService);
try {
return ServiceConfigUtil.getServiceEngine().getJmsServiceByName(location);
} catch (GenericConfigException e) {
throw new GenericServiceException(e);
}
}
protected Message makeMessage(Session session, ModelService modelService, Map<String, Object> context)
throws GenericServiceException, JMSException {
List<String> outParams = modelService.getParameterNames(ModelService.OUT_PARAM, false);
if (UtilValidate.isNotEmpty(outParams))
throw new GenericServiceException("JMS service cannot have required OUT parameters; no parameters will be returned.");
String xmlContext = null;
try {
if (Debug.verboseOn()) Debug.logVerbose("Serializing Context --> " + context, module);
xmlContext = JmsSerializer.serialize(context);
} catch (Exception e) {
throw new GenericServiceException("Cannot serialize context.", e);
}
MapMessage message = session.createMapMessage();
message.setString("serviceName", modelService.invoke);
message.setString("serviceContext", xmlContext);
return message;
}
protected List<? extends Element> serverList(Element serviceElement) throws GenericServiceException {
String sendMode = serviceElement.getAttribute("send-mode");
List<? extends Element> serverList = UtilXml.childElementList(serviceElement, "server");
if (sendMode.equals("none")) {
return new ArrayList<Element>();
} else if (sendMode.equals("all")) {
return serverList;
} else {
throw new GenericServiceException("Requested send mode not supported.");
}
}
protected Map<String, Object> runTopic(ModelService modelService, Map<String, Object> context, Server server) throws GenericServiceException {
String serverName = server.getJndiServerName();
String jndiName = server.getJndiName();
String topicName = server.getTopicQueue();
String userName = server.getUsername();
String password = server.getPassword();
String clientId = server.getClientId();
InitialContext jndi = null;
TopicConnectionFactory factory = null;
TopicConnection con = null;
try {
jndi = JNDIContextFactory.getInitialContext(serverName);
factory = (TopicConnectionFactory) jndi.lookup(jndiName);
} catch (GeneralException ge) {
throw new GenericServiceException("Problems getting JNDI InitialContext.", ge.getNested());
} catch (NamingException ne) {
JNDIContextFactory.clearInitialContext(serverName);
try {
jndi = JNDIContextFactory.getInitialContext(serverName);
factory = (TopicConnectionFactory) jndi.lookup(jndiName);
} catch (GeneralException ge2) {
throw new GenericServiceException("Problems getting JNDI InitialContext.", ge2.getNested());
} catch (NamingException ne2) {
throw new GenericServiceException("JNDI lookup problems.", ne);
}
}
try {
con = factory.createTopicConnection(userName, password);
if (clientId != null && clientId.length() > 1)
con.setClientID(clientId);
con.start();
TopicSession session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) jndi.lookup(topicName);
TopicPublisher publisher = session.createPublisher(topic);
// create/send the message
Message message = makeMessage(session, modelService, context);
publisher.publish(message);
if (Debug.verboseOn()) Debug.logVerbose("Sent JMS Message to " + topicName, module);
// close the connections
publisher.close();
session.close();
con.close();
} catch (NamingException ne) {
throw new GenericServiceException("Problems with JNDI lookup.", ne);
} catch (JMSException je) {
throw new GenericServiceException("JMS Internal Error.", je);
}
return ServiceUtil.returnSuccess();
}
protected Map<String, Object> runQueue(ModelService modelService, Map<String, Object> context, Server server) throws GenericServiceException {
String serverName = server.getJndiServerName();
String jndiName = server.getJndiName();
String queueName = server.getTopicQueue();
String userName = server.getUsername();
String password = server.getPassword();
String clientId = server.getClientId();
InitialContext jndi = null;
QueueConnectionFactory factory = null;
QueueConnection con = null;
try {
jndi = JNDIContextFactory.getInitialContext(serverName);
factory = (QueueConnectionFactory) jndi.lookup(jndiName);
} catch (GeneralException ge) {
throw new GenericServiceException("Problems getting JNDI InitialContext.", ge.getNested());
} catch (NamingException ne) {
JNDIContextFactory.clearInitialContext(serverName);
try {
jndi = JNDIContextFactory.getInitialContext(serverName);
factory = (QueueConnectionFactory) jndi.lookup(jndiName);
} catch (GeneralException ge2) {
throw new GenericServiceException("Problems getting JNDI InitialContext.", ge2.getNested());
} catch (NamingException ne2) {
throw new GenericServiceException("JNDI lookup problem.", ne2);
}
}
try {
con = factory.createQueueConnection(userName, password);
if (clientId != null && clientId.length() > 1)
con.setClientID(clientId);
con.start();
QueueSession session = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) jndi.lookup(queueName);
QueueSender sender = session.createSender(queue);
// create/send the message
Message message = makeMessage(session, modelService, context);
sender.send(message);
if (Debug.verboseOn()) Debug.logVerbose("Sent JMS Message to " + queueName, module);
// close the connections
sender.close();
session.close();
con.close();
} catch (NamingException ne) {
throw new GenericServiceException("Problems with JNDI lookup.", ne);
} catch (JMSException je) {
throw new GenericServiceException("JMS Internal Error.", je);
}
return ServiceUtil.returnSuccess();
}
protected Map<String, Object> runXaQueue(ModelService modelService, Map<String, Object> context, Element server) throws GenericServiceException {
String serverName = server.getAttribute("jndi-server-name");
String jndiName = server.getAttribute("jndi-name");
String queueName = server.getAttribute("topic-queue");
String userName = server.getAttribute("username");
String password = server.getAttribute("password");
String clientId = server.getAttribute("client-id");
InitialContext jndi = null;
XAQueueConnectionFactory factory = null;
XAQueueConnection con = null;
try {
jndi = JNDIContextFactory.getInitialContext(serverName);
factory = (XAQueueConnectionFactory) jndi.lookup(jndiName);
} catch (GeneralException ge) {
throw new GenericServiceException("Problems getting JNDI InitialContext.", ge.getNested());
} catch (NamingException ne) {
JNDIContextFactory.clearInitialContext(serverName);
try {
jndi = JNDIContextFactory.getInitialContext(serverName);
factory = (XAQueueConnectionFactory) jndi.lookup(jndiName);
} catch (GeneralException ge2) {
throw new GenericServiceException("Problems getting JNDI InitialContext.", ge2.getNested());
} catch (NamingException ne2) {
throw new GenericServiceException("JNDI lookup problems.", ne2);
}
}
try {
con = factory.createXAQueueConnection(userName, password);
if (clientId != null && clientId.length() > 1)
con.setClientID(userName);
con.start();
// enlist the XAResource
XAQueueSession session = con.createXAQueueSession();
XAResource resource = session.getXAResource();
if (TransactionUtil.getStatus() == TransactionUtil.STATUS_ACTIVE)
TransactionUtil.enlistResource(resource);
Queue queue = (Queue) jndi.lookup(queueName);
QueueSession qSession = session.getQueueSession();
QueueSender sender = qSession.createSender(queue);
// create/send the message
Message message = makeMessage(session, modelService, context);
sender.send(message);
if (TransactionUtil.getStatus() != TransactionUtil.STATUS_ACTIVE)
session.commit();
Debug.logInfo("Message sent.", module);
// close the connections
sender.close();
session.close();
con.close();
} catch (GenericTransactionException gte) {
throw new GenericServiceException("Problems enlisting resource w/ transaction manager.", gte.getNested());
} catch (NamingException ne) {
throw new GenericServiceException("Problems with JNDI lookup.", ne);
} catch (JMSException je) {
throw new GenericServiceException("JMS Internal Error.", je);
}
return ServiceUtil.returnSuccess();
}
protected Map<String, Object> run(ModelService modelService, Map<String, Object> context) throws GenericServiceException {
JmsService serviceElement = getServiceElement(modelService);
List<Server> serverList = serviceElement.getServers();
Map<String, Object> result = new HashMap<String, Object>();
for (Server server: serverList) {
String serverType = server.getType();
if (serverType.equals("topic"))
result.putAll(runTopic(modelService, context, server));
else if (serverType.equals("queue"))
result.putAll(runQueue(modelService, context, server));
else
throw new GenericServiceException("Illegal server messaging type.");
}
return result;
}
/**
* @see org.ofbiz.service.engine.GenericEngine#runSync(java.lang.String, org.ofbiz.service.ModelService, java.util.Map)
*/
public Map<String, Object> runSync(String localName, ModelService modelService, Map<String, Object> context) throws GenericServiceException {
return run(modelService, context);
}
/**
* @see org.ofbiz.service.engine.GenericEngine#runSyncIgnore(java.lang.String, org.ofbiz.service.ModelService, java.util.Map)
*/
public void runSyncIgnore(String localName, ModelService modelService, Map<String, Object> context) throws GenericServiceException {
run(modelService, context);
}
/**
* @see org.ofbiz.service.engine.GenericEngine#runAsync(java.lang.String, org.ofbiz.service.ModelService, java.util.Map, org.ofbiz.service.GenericRequester, boolean)
*/
public void runAsync(String localName, ModelService modelService, Map<String, Object> context, GenericRequester requester, boolean persist) throws GenericServiceException {
Map<String, Object> result = run(modelService, context);
requester.receiveResult(result);
}
/**
* @see org.ofbiz.service.engine.GenericEngine#runAsync(java.lang.String, org.ofbiz.service.ModelService, java.util.Map, boolean)
*/
public void runAsync(String localName, ModelService modelService, Map<String, Object> context, boolean persist) throws GenericServiceException {
run(modelService, context);
}
}