blob: 8b69e6558ccb3617dfca4a9885d8b65ff266216c [file] [log] [blame]
/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.axis2.transport.jms;
import org.apache.axiom.om.OMOutputFormat;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMText;
import org.apache.axiom.om.OMNode;
import org.apache.axis2.util.MessageProcessorSelector;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.transport.MessageFormatter;
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axis2.transport.base.*;
import org.apache.axis2.transport.http.HTTPConstants;
import org.apache.axis2.transport.jms.iowrappers.BytesMessageOutputStream;
import org.apache.commons.io.output.WriterOutputStream;
import javax.jms.*;
import javax.activation.DataHandler;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
import java.nio.charset.UnsupportedCharsetException;
import java.util.*;
/**
* The TransportSender for JMS
*/
public class JMSSender extends AbstractTransportSender implements ManagementSupport {
public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS;
/** The JMS connection factory manager to be used when sending messages out */
private JMSConnectionFactoryManager connFacManager;
/**
* Initialize the transport sender by reading pre-defined connection factories for
* outgoing messages.
*
* @param cfgCtx the configuration context
* @param transportOut the transport sender definition from axis2.xml
* @throws AxisFault on error
*/
@Override
public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
super.init(cfgCtx, transportOut);
connFacManager = new JMSConnectionFactoryManager(transportOut);
log.info("JMS Transport Sender initialized...");
}
@Override
public void stop() {
// clean up any shared JMS resources in this sender's connection factories
connFacManager.stop();
super.stop();
}
/**
* Get corresponding JMS connection factory defined within the transport sender for the
* transport-out information - usually constructed from a targetEPR
*
* @param trpInfo the transport-out information
* @return the corresponding JMS connection factory, if any
*/
private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) {
Map<String,String> props = trpInfo.getProperties();
if (trpInfo.getProperties() != null) {
String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC);
if (jmsConnectionFactoryName != null) {
return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName);
} else {
JMSConnectionFactory fac = connFacManager.getJMSConnectionFactory(props);
if (fac == null) {
fac = connFacManager.getJMSConnectionFactory(JMSConstants.DEFAULT_CONFAC_NAME);
}
return fac;
}
} else {
return null;
}
}
/**
* Performs the actual sending of the JMS message
*/
@Override
public void sendMessage(MessageContext msgCtx, String targetAddress,
OutTransportInfo outTransportInfo) throws AxisFault {
JMSConnectionFactory jmsConnectionFactory = null;
JMSOutTransportInfo jmsOut = null;
JMSMessageSender messageSender = null;
if (targetAddress != null) {
jmsOut = new JMSOutTransportInfo(targetAddress);
// do we have a definition for a connection factory to use for this address?
jmsConnectionFactory = getJMSConnectionFactory(jmsOut);
if (jmsConnectionFactory != null) {
messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress);
} else {
try {
messageSender = jmsOut.createJMSSender();
} catch (JMSException e) {
handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
}
}
} else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) {
jmsOut = (JMSOutTransportInfo) outTransportInfo;
try {
messageSender = jmsOut.createJMSSender();
} catch (JMSException e) {
handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e);
}
}
// The message property to be used to send the content type is determined by
// the out transport info, i.e. either from the EPR if we are sending a request,
// or, if we are sending a response, from the configuration of the service that
// received the request). The property name can be overridden by a message
// context property.
String contentTypeProperty =
(String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
if (contentTypeProperty == null) {
contentTypeProperty = jmsOut.getContentTypeProperty();
}
// need to synchronize as Sessions are not thread safe
synchronized (messageSender.getSession()) {
try {
sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut);
} finally {
messageSender.close();
}
}
}
/**
* Perform actual sending of the JMS message
*/
private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender,
String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory,
JMSOutTransportInfo jmsOut) throws AxisFault {
// convert the axis message context into a JMS Message that we can send over JMS
Message message = null;
String correlationId = null;
try {
message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty);
} catch (JMSException e) {
handleException("Error creating a JMS message from the message context", e);
}
// should we wait for a synchronous response on this same thread?
boolean waitForResponse = waitForSynchronousResponse(msgCtx);
Destination replyDestination = jmsOut.getReplyDestination();
// if this is a synchronous out-in, prepare to listen on the response destination
if (waitForResponse) {
String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO);
if (replyDestName == null && jmsConnectionFactory != null) {
replyDestName = jmsConnectionFactory.getReplyToDestination();
}
String replyDestType = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO_TYPE);
if (replyDestType == null && jmsConnectionFactory != null) {
replyDestType = jmsConnectionFactory.getReplyDestinationType();
}
if (replyDestName != null) {
if (jmsConnectionFactory != null) {
replyDestination = jmsConnectionFactory.getDestination(
replyDestName, replyDestType);
} else {
replyDestination = jmsOut.getReplyDestination(replyDestName);
}
}
replyDestination = JMSUtils.setReplyDestination(
replyDestination, messageSender.getSession(), message);
}
try {
messageSender.send(message, msgCtx);
metrics.incrementMessagesSent(msgCtx);
} catch (AxisJMSException e) {
metrics.incrementFaultsSending();
handleException("Error sending JMS message", e);
}
try {
metrics.incrementBytesSent(msgCtx, JMSUtils.getMessageSize(message));
} catch (JMSException e) {
log.warn("Error reading JMS message size to update transport metrics", e);
}
// if we are expecting a synchronous response back for the message sent out
if (waitForResponse) {
// TODO ********************************************************************************
// TODO **** replace with asynchronous polling via a poller task to process this *******
// information would be given. Then it should poll (until timeout) the
// requested destination for the response message and inject it from a
// asynchronous worker thread
try {
messageSender.getConnection().start(); // multiple calls are safely ignored
} catch (JMSException ignore) {}
try {
String jmsCorrelationID = message.getJMSCorrelationID();
if (jmsCorrelationID != null && jmsCorrelationID.length() > 0) {
correlationId = jmsCorrelationID;
} else {
correlationId = message.getJMSMessageID();
}
} catch(JMSException ignore) {}
// We assume here that the response uses the same message property to
// specify the content type of the message.
waitForResponseAndProcess(messageSender.getSession(), replyDestination,
msgCtx, correlationId, contentTypeProperty);
// TODO ********************************************************************************
}
}
/**
* Create a Consumer for the reply destination and wait for the response JMS message
* synchronously. If a message arrives within the specified time interval, process it
* through Axis2
* @param session the session to use to listen for the response
* @param replyDestination the JMS reply Destination
* @param msgCtx the outgoing message for which we are expecting the response
* @param contentTypeProperty the message property used to determine the content type
* of the response message
* @throws AxisFault on error
*/
private void waitForResponseAndProcess(Session session, Destination replyDestination,
MessageContext msgCtx, String correlationId,
String contentTypeProperty) throws AxisFault {
try {
MessageConsumer consumer;
consumer = JMSUtils.createConsumer(session, replyDestination,
"JMSCorrelationID = '" + correlationId + "'");
// how long are we willing to wait for the sync response
long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT;
String waitReply = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY);
if (waitReply != null) {
timeout = Long.valueOf(waitReply).longValue();
}
if (log.isDebugEnabled()) {
log.debug("Waiting for a maximum of " + timeout +
"ms for a response message to destination : " + replyDestination +
" with JMS correlation ID : " + correlationId);
}
Message reply = consumer.receive(timeout);
if (reply != null) {
// update transport level metrics
metrics.incrementMessagesReceived();
try {
metrics.incrementBytesReceived(JMSUtils.getMessageSize(reply));
} catch (JMSException e) {
log.warn("Error reading JMS message size to update transport metrics", e);
}
try {
processSyncResponse(msgCtx, reply, contentTypeProperty);
metrics.incrementMessagesReceived();
} catch (AxisFault e) {
metrics.incrementFaultsReceiving();
throw e;
}
} else {
log.warn("Did not receive a JMS response within " +
timeout + " ms to destination : " + replyDestination +
" with JMS correlation ID : " + correlationId);
metrics.incrementTimeoutsReceiving();
}
} catch (JMSException e) {
metrics.incrementFaultsReceiving();
handleException("Error creating a consumer, or receiving a synchronous reply " +
"for outgoing MessageContext ID : " + msgCtx.getMessageID() +
" and reply Destination : " + replyDestination, e);
}
}
/**
* Create a JMS Message from the given MessageContext and using the given
* session
*
* @param msgContext the MessageContext
* @param session the JMS session
* @param contentTypeProperty the message property to be used to store the
* content type
* @return a JMS message from the context and session
* @throws JMSException on exception
* @throws AxisFault on exception
*/
private Message createJMSMessage(MessageContext msgContext, Session session,
String contentTypeProperty) throws JMSException, AxisFault {
Message message = null;
String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE);
// check the first element of the SOAP body, do we have content wrapped using the
// default wrapper elements for binary (BaseConstants.DEFAULT_BINARY_WRAPPER) or
// text (BaseConstants.DEFAULT_TEXT_WRAPPER) ? If so, do not create SOAP messages
// for JMS but just get the payload in its native format
String jmsPayloadType = guessMessageType(msgContext);
if (jmsPayloadType == null) {
OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
MessageFormatter messageFormatter = null;
try {
messageFormatter = MessageProcessorSelector.getMessageFormatter(msgContext);
} catch (AxisFault axisFault) {
throw new JMSException("Unable to get the message formatter to use");
}
String contentType = messageFormatter.getContentType(
msgContext, format, msgContext.getSoapAction());
boolean useBytesMessage =
msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) ||
contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1;
OutputStream out;
StringWriter sw;
if (useBytesMessage) {
BytesMessage bytesMsg = session.createBytesMessage();
sw = null;
out = new BytesMessageOutputStream(bytesMsg);
message = bytesMsg;
} else {
sw = new StringWriter();
try {
out = new WriterOutputStream(sw, format.getCharSetEncoding());
} catch (UnsupportedCharsetException ex) {
handleException("Unsupported encoding " + format.getCharSetEncoding(), ex);
return null;
}
}
try {
messageFormatter.writeTo(msgContext, format, out, true);
out.close();
} catch (IOException e) {
handleException("IO Error while creating BytesMessage", e);
}
if (!useBytesMessage) {
TextMessage txtMsg = session.createTextMessage();
txtMsg.setText(sw.toString());
message = txtMsg;
}
if (contentTypeProperty != null) {
message.setStringProperty(contentTypeProperty, contentType);
}
} else if (JMSConstants.JMS_BYTE_MESSAGE.equals(jmsPayloadType)) {
message = session.createBytesMessage();
BytesMessage bytesMsg = (BytesMessage) message;
OMElement wrapper = msgContext.getEnvelope().getBody().
getFirstChildWithName(BaseConstants.DEFAULT_BINARY_WRAPPER);
OMNode omNode = wrapper.getFirstOMChild();
if (omNode != null && omNode instanceof OMText) {
Object dh = ((OMText) omNode).getDataHandler();
if (dh != null && dh instanceof DataHandler) {
try {
((DataHandler) dh).writeTo(new BytesMessageOutputStream(bytesMsg));
} catch (IOException e) {
handleException("Error serializing binary content of element : " +
BaseConstants.DEFAULT_BINARY_WRAPPER, e);
}
}
}
} else if (JMSConstants.JMS_TEXT_MESSAGE.equals(jmsPayloadType)) {
message = session.createTextMessage();
TextMessage txtMsg = (TextMessage) message;
txtMsg.setText(msgContext.getEnvelope().getBody().
getFirstChildWithName(BaseConstants.DEFAULT_TEXT_WRAPPER).getText());
}
// set the JMS correlation ID if specified
String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID);
if (correlationId == null && msgContext.getRelatesTo() != null) {
correlationId = msgContext.getRelatesTo().getValue();
}
if (correlationId != null) {
message.setJMSCorrelationID(correlationId);
}
if (msgContext.isServerSide()) {
// set SOAP Action as a property on the JMS message
setProperty(message, msgContext, BaseConstants.SOAPACTION);
} else {
String action = msgContext.getOptions().getAction();
if (action != null) {
message.setStringProperty(BaseConstants.SOAPACTION, action);
}
}
JMSUtils.setTransportHeaders(msgContext, message);
return message;
}
/**
* Guess the message type to use for JMS looking at the message contexts' envelope
* @param msgContext the message context
* @return JMSConstants.JMS_BYTE_MESSAGE or JMSConstants.JMS_TEXT_MESSAGE or null
*/
private String guessMessageType(MessageContext msgContext) {
OMElement firstChild = msgContext.getEnvelope().getBody().getFirstElement();
if (firstChild != null) {
if (BaseConstants.DEFAULT_BINARY_WRAPPER.equals(firstChild.getQName())) {
return JMSConstants.JMS_BYTE_MESSAGE;
} else if (BaseConstants.DEFAULT_TEXT_WRAPPER.equals(firstChild.getQName())) {
return JMSConstants.JMS_TEXT_MESSAGE;
}
}
return null;
}
/**
* Creates an Axis MessageContext for the received JMS message and
* sets up the transports and various properties
*
* @param outMsgCtx the outgoing message for which we are expecting the response
* @param message the JMS response message received
* @param contentTypeProperty the message property used to determine the content type
* of the response message
* @throws AxisFault on error
*/
private void processSyncResponse(MessageContext outMsgCtx, Message message,
String contentTypeProperty) throws AxisFault {
MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx);
// load any transport headers from received message
JMSUtils.loadTransportHeaders(message, responseMsgCtx);
String contentType = contentTypeProperty == null ? null
: JMSUtils.getProperty(message, contentTypeProperty);
try {
JMSUtils.setSOAPEnvelope(message, responseMsgCtx, contentType);
} catch (JMSException ex) {
throw AxisFault.makeFault(ex);
}
handleIncomingMessage(
responseMsgCtx,
JMSUtils.getTransportHeaders(message),
JMSUtils.getProperty(message, BaseConstants.SOAPACTION),
contentType
);
}
private void setProperty(Message message, MessageContext msgCtx, String key) {
String value = getProperty(msgCtx, key);
if (value != null) {
try {
message.setStringProperty(key, value);
} catch (JMSException e) {
log.warn("Couldn't set message property : " + key + " = " + value, e);
}
}
}
private String getProperty(MessageContext mc, String key) {
return (String) mc.getProperty(key);
}
}