blob: 61f5df5ea6cc57f0118e73231a60b9ffdddb5a23 [file] [log] [blame]
/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.axis2.transport.jms;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.transport.base.BaseConstants;
import org.apache.axis2.transport.base.MetricsCollector;
import org.apache.axis2.transport.jms.ctype.ContentTypeInfo;
import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.*;
import javax.transaction.UserTransaction;
/**
* This is the JMS message receiver which is invoked when a message is received. This processes
* the message through the engine
*/
public class JMSMessageReceiver {
private static final Log log = LogFactory.getLog(JMSMessageReceiver.class);
/** The JMSListener */
private JMSListener jmsListener = null;
/** A reference to the JMS Connection Factory */
private JMSConnectionFactory jmsConnectionFactory = null;
/** The JMS metrics collector */
private MetricsCollector metrics = null;
/** The endpoint this message receiver is bound to */
final JMSEndpoint endpoint;
/**
* Create a new JMSMessage receiver
*
* @param jmsListener the JMS transport Listener
* @param jmsConFac the JMS connection factory we are associated with
* @param workerPool the worker thread pool to be used
* @param cfgCtx the axis ConfigurationContext
* @param serviceName the name of the Axis service
* @param endpoint the JMSEndpoint definition to be used
*/
JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) {
this.jmsListener = jmsListener;
this.jmsConnectionFactory = jmsConFac;
this.endpoint = endpoint;
this.metrics = jmsListener.getMetricsCollector();
}
/**
* Process a new message received
*
* @param message the JMS message received
* @param ut UserTransaction which was used to receive the message
* @return true if caller should commit
*/
public boolean onMessage(Message message, UserTransaction ut) {
try {
if (log.isDebugEnabled()) {
StringBuffer sb = new StringBuffer();
sb.append("Received new JMS message for service :").append(endpoint.getServiceName());
sb.append("\nDestination : ").append(message.getJMSDestination());
sb.append("\nMessage ID : ").append(message.getJMSMessageID());
sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID());
sb.append("\nReplyTo : ").append(message.getJMSReplyTo());
sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered());
sb.append("\nPriority : ").append(message.getJMSPriority());
sb.append("\nExpiration : ").append(message.getJMSExpiration());
sb.append("\nTimestamp : ").append(message.getJMSTimestamp());
sb.append("\nMessage Type : ").append(message.getJMSType());
sb.append("\nPersistent ? : ").append(
DeliveryMode.PERSISTENT == message.getJMSDeliveryMode());
log.debug(sb.toString());
if (log.isTraceEnabled() && message instanceof TextMessage) {
log.trace("\nMessage : " + ((TextMessage) message).getText());
}
}
} catch (JMSException e) {
if (log.isDebugEnabled()) {
log.debug("Error reading JMS message headers for debug logging", e);
}
}
// update transport level metrics
try {
metrics.incrementBytesReceived(JMSUtils.getMessageSize(message));
} catch (JMSException e) {
log.warn("Error reading JMS message size to update transport metrics", e);
}
// has this message already expired? expiration time == 0 means never expires
// TODO: explain why this is necessary; normally it is the responsibility of the provider to handle message expiration
try {
long expiryTime = message.getJMSExpiration();
if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) {
if (log.isDebugEnabled()) {
log.debug("Discard expired message with ID : " + message.getJMSMessageID());
}
return true;
}
} catch (JMSException ignore) {}
boolean successful = false;
try {
successful = processThoughEngine(message, ut);
} catch (JMSException e) {
log.error("JMS Exception encountered while processing", e);
} catch (AxisFault e) {
log.error("Axis fault processing message", e);
} catch (Exception e) {
log.error("Unknown error processing message", e);
} finally {
if (successful) {
metrics.incrementMessagesReceived();
} else {
metrics.incrementFaultsReceiving();
}
}
return successful;
}
/**
* Process the new message through Axis2
*
* @param message the JMS message
* @param ut the UserTransaction used for receipt
* @return true if the caller should commit
* @throws JMSException, on JMS exceptions
* @throws AxisFault on Axis2 errors
*/
private boolean processThoughEngine(Message message, UserTransaction ut)
throws JMSException, AxisFault {
MessageContext msgContext = endpoint.createMessageContext();
// set the JMS Message ID as the Message ID of the MessageContext
try {
msgContext.setMessageID(message.getJMSMessageID());
String jmsCorrelationID = message.getJMSCorrelationID();
if (jmsCorrelationID != null && jmsCorrelationID.length() > 0) {
msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, jmsCorrelationID);
} else {
msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID());
}
} catch (JMSException ignore) {}
String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION);
ContentTypeInfo contentTypeInfo =
endpoint.getContentTypeRuleSet().getContentTypeInfo(message);
if (contentTypeInfo == null) {
throw new AxisFault("Unable to determine content type for message " +
msgContext.getMessageID());
}
// set the message property OUT_TRANSPORT_INFO
// the reply is assumed to be over the JMSReplyTo destination, using
// the same incoming connection factory, if a JMSReplyTo is available
Destination replyTo = message.getJMSReplyTo();
if (replyTo == null) {
// does the service specify a default reply destination ?
String jndiReplyDestinationName = endpoint.getJndiReplyDestinationName();
if (jndiReplyDestinationName != null) {
replyTo = jmsConnectionFactory.getDestination(jndiReplyDestinationName,
endpoint.getReplyDestinationType());
}
}
if (replyTo != null) {
msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
new JMSOutTransportInfo(jmsConnectionFactory, replyTo,
contentTypeInfo.getPropertyName()));
}
JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType());
if (ut != null) {
msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut);
}
jmsListener.handleIncomingMessage(
msgContext,
JMSUtils.getTransportHeaders(message),
soapAction,
contentTypeInfo.getContentType());
Object o = msgContext.getProperty(BaseConstants.SET_ROLLBACK_ONLY);
if (o != null) {
if ((o instanceof Boolean && ((Boolean) o)) ||
(o instanceof String && Boolean.valueOf((String) o))) {
return false;
}
}
return true;
}
}