blob: f1f1addb36e81868fd678de022924a8af5bc7fc1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.jms;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate;
import org.apache.camel.component.jms.reply.ReplyManager;
import org.apache.camel.component.jms.reply.UseMessageIdAsCorrelationIdMessageSentCallback;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.spi.UuidGenerator;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ValueHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.MessageCreator;
import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
/**
* @version
*/
public class JmsProducer extends DefaultAsyncProducer {
private static final transient Logger LOG = LoggerFactory.getLogger(JmsProducer.class);
private final JmsEndpoint endpoint;
private final AtomicBoolean started = new AtomicBoolean(false);
private JmsOperations inOnlyTemplate;
private JmsOperations inOutTemplate;
private UuidGenerator uuidGenerator;
private ReplyManager replyManager;
public JmsProducer(JmsEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
}
protected void initReplyManager() {
if (!started.get()) {
synchronized (this) {
if (started.get()) {
return;
}
try {
if (endpoint.getReplyTo() != null) {
replyManager = endpoint.getReplyManager(endpoint.getReplyTo());
LOG.info("Using JmsReplyManager: " + replyManager + " to process replies from: " + endpoint.getReplyTo());
} else {
replyManager = endpoint.getReplyManager();
LOG.info("Using JmsReplyManager: " + replyManager + " to process replies from temporary queue");
}
} catch (Exception e) {
throw new FailedToCreateProducerException(endpoint, e);
}
started.set(true);
}
}
}
public boolean process(Exchange exchange, AsyncCallback callback) {
// deny processing if we are not started
if (!isRunAllowed()) {
if (exchange.getException() == null) {
exchange.setException(new RejectedExecutionException());
}
// we cannot process so invoke callback
callback.done(true);
return true;
}
if (!endpoint.isDisableReplyTo() && exchange.getPattern().isOutCapable()) {
// in out requires a bit more work than in only
return processInOut(exchange, callback);
} else {
// in only
return processInOnly(exchange, callback);
}
}
protected boolean processInOut(final Exchange exchange, final AsyncCallback callback) {
final org.apache.camel.Message in = exchange.getIn();
String destinationName = in.getHeader(JmsConstants.JMS_DESTINATION_NAME, String.class);
// remove the header so it wont be propagated
in.removeHeader(JmsConstants.JMS_DESTINATION_NAME);
if (destinationName == null) {
destinationName = endpoint.getDestinationName();
}
Destination destination = in.getHeader(JmsConstants.JMS_DESTINATION, Destination.class);
// remove the header so it wont be propagated
in.removeHeader(JmsConstants.JMS_DESTINATION);
if (destination == null) {
destination = endpoint.getDestination();
}
if (destination != null) {
// prefer to use destination over destination name
destinationName = null;
}
initReplyManager();
// when using message id as correlation id, we need at first to use a provisional correlation id
// which we then update to the real JMSMessageID when the message has been sent
// this is done with the help of the MessageSentCallback
final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
final String provisionalCorrelationId = msgIdAsCorrId ? getUuidGenerator().generateUuid() : null;
MessageSentCallback messageSentCallback = null;
if (msgIdAsCorrId) {
messageSentCallback = new UseMessageIdAsCorrelationIdMessageSentCallback(replyManager, provisionalCorrelationId, endpoint.getRequestTimeout());
}
final ValueHolder<MessageSentCallback> sentCallback = new ValueHolder<MessageSentCallback>(messageSentCallback);
final String originalCorrelationId = in.getHeader("JMSCorrelationID", String.class);
if (originalCorrelationId == null && !msgIdAsCorrId) {
in.setHeader("JMSCorrelationID", getUuidGenerator().generateUuid());
}
MessageCreator messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
// get the reply to destination to be used from the reply manager
Destination replyTo = replyManager.getReplyTo();
if (replyTo == null) {
throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange);
}
JmsMessageHelper.setJMSReplyTo(message, replyTo);
replyManager.setReplyToSelectorHeader(in, message);
String correlationId = determineCorrelationId(message, provisionalCorrelationId);
replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, endpoint.getRequestTimeout());
return message;
}
};
doSend(true, destinationName, destination, messageCreator, sentCallback.get());
// after sending then set the OUT message id to the JMSMessageID so its identical
setMessageId(exchange);
// continue routing asynchronously (reply will be processed async when its received)
return false;
}
/**
* Strategy to determine which correlation id to use among <tt>JMSMessageID</tt> and <tt>JMSCorrelationID</tt>.
*
* @param message the JMS message
* @param provisionalCorrelationId an optional provisional correlation id, which is preferred to be used
* @return the correlation id to use
* @throws JMSException can be thrown
*/
protected String determineCorrelationId(Message message, String provisionalCorrelationId) throws JMSException {
if (provisionalCorrelationId != null) {
return provisionalCorrelationId;
}
final String messageId = message.getJMSMessageID();
final String correlationId = message.getJMSCorrelationID();
if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
return messageId;
} else if (ObjectHelper.isEmpty(correlationId)) {
// correlation id is empty so fallback to message id
return messageId;
} else {
return correlationId;
}
}
protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) {
final org.apache.camel.Message in = exchange.getIn();
String destinationName = in.getHeader(JmsConstants.JMS_DESTINATION_NAME, String.class);
if (destinationName != null) {
// remove the header so it wont be propagated
in.removeHeader(JmsConstants.JMS_DESTINATION_NAME);
}
if (destinationName == null) {
destinationName = endpoint.getDestinationName();
}
Destination destination = in.getHeader(JmsConstants.JMS_DESTINATION, Destination.class);
if (destination != null) {
// remove the header so it wont be propagated
in.removeHeader(JmsConstants.JMS_DESTINATION);
}
if (destination == null) {
destination = endpoint.getDestination();
}
if (destination != null) {
// prefer to use destination over destination name
destinationName = null;
}
final String to = destinationName != null ? destinationName : "" + destination;
MessageCreator messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message answer = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
// when in InOnly mode the JMSReplyTo is a bit complicated
// we only want to set the JMSReplyTo on the answer if
// there is a JMSReplyTo from the header/endpoint and
// we have been told to preserveMessageQos
Object jmsReplyTo = JmsMessageHelper.getJMSReplyTo(answer);
if (endpoint.isDisableReplyTo()) {
// honor disable reply to configuration
if (LOG.isDebugEnabled()) {
LOG.debug("ReplyTo is disabled on endpoint: " + endpoint);
}
JmsMessageHelper.setJMSReplyTo(answer, null);
} else {
// if the binding did not create the reply to then we have to try to create it here
if (jmsReplyTo == null) {
// prefer reply to from header over endpoint configured
jmsReplyTo = exchange.getIn().getHeader("JMSReplyTo", String.class);
if (jmsReplyTo == null) {
jmsReplyTo = endpoint.getReplyTo();
}
}
}
// we must honor these special flags to preserve QoS
// as we are not OUT capable and thus do not expect a reply, and therefore
// the consumer of this message should not return a reply so we remove it
// unless we use preserveMessageQos=true to tell that we still want to use JMSReplyTo
if (jmsReplyTo != null && !(endpoint.isPreserveMessageQos() || endpoint.isExplicitQosEnabled())) {
// log at debug what we are doing, as higher level may cause noise in production logs
// this behavior is also documented at the camel website
if (LOG.isDebugEnabled()) {
LOG.debug("Disabling JMSReplyTo: " + jmsReplyTo + " for destination: " + to
+ ". Use preserveMessageQos=true to force Camel to keep the JMSReplyTo on endpoint: " + endpoint);
}
jmsReplyTo = null;
}
// the reply to is a String, so we need to look up its Destination instance
// and if needed create the destination using the session if needed to
if (jmsReplyTo != null && jmsReplyTo instanceof String) {
// must normalize the destination name
String replyTo = normalizeDestinationName((String) jmsReplyTo);
// we need to null it as we use the String to resolve it as a Destination instance
jmsReplyTo = null;
// try using destination resolver to lookup the destination
if (endpoint.getDestinationResolver() != null) {
jmsReplyTo = endpoint.getDestinationResolver().resolveDestinationName(session, replyTo, endpoint.isPubSubDomain());
}
if (jmsReplyTo == null) {
// okay then fallback and create the queue
if (endpoint.isPubSubDomain()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating JMSReplyTo topic: " + replyTo);
}
jmsReplyTo = session.createTopic(replyTo);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating JMSReplyTo queue: " + replyTo);
}
jmsReplyTo = session.createQueue(replyTo);
}
}
}
// set the JMSReplyTo on the answer if we are to use it
Destination replyTo = null;
if (jmsReplyTo instanceof Destination) {
replyTo = (Destination) jmsReplyTo;
}
if (replyTo != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using JMSReplyTo destination: " + replyTo);
}
JmsMessageHelper.setJMSReplyTo(answer, replyTo);
} else {
// do not use JMSReplyTo
JmsMessageHelper.setJMSReplyTo(answer, null);
}
return answer;
}
};
doSend(false, destinationName, destination, messageCreator, null);
// after sending then set the OUT message id to the JMSMessageID so its identical
setMessageId(exchange);
// we are synchronous so return true
callback.done(true);
return true;
}
/**
* Sends the message using the JmsTemplate.
*
* @param inOut use inOut or inOnly template
* @param destinationName the destination name
* @param destination the destination (if no name provided)
* @param messageCreator the creator to create the {@link Message} to send
* @param callback optional callback for inOut messages
*/
protected void doSend(boolean inOut, String destinationName, Destination destination,
MessageCreator messageCreator, MessageSentCallback callback) {
CamelJmsTemplate template = (CamelJmsTemplate) (inOut ? getInOutTemplate() : getInOnlyTemplate());
if (LOG.isTraceEnabled()) {
LOG.trace("Using " + (inOut ? "inOut" : "inOnly") + " jms template");
}
// destination should be preferred
if (destination != null) {
if (inOut) {
if (template != null) {
template.send(destination, messageCreator, callback);
}
} else {
if (template != null) {
template.send(destination, messageCreator);
}
}
} else if (destinationName != null) {
if (inOut) {
if (template != null) {
template.send(destinationName, messageCreator, callback);
}
} else {
if (template != null) {
template.send(destinationName, messageCreator);
}
}
} else {
throw new IllegalArgumentException("Neither destination nor destinationName is specified on this endpoint: " + endpoint);
}
}
protected void setMessageId(Exchange exchange) {
if (exchange.hasOut()) {
JmsMessage out = (JmsMessage) exchange.getOut();
try {
if (out != null && out.getJmsMessage() != null) {
out.setMessageId(out.getJmsMessage().getJMSMessageID());
}
} catch (JMSException e) {
LOG.warn("Unable to retrieve JMSMessageID from outgoing "
+ "JMS Message and set it into Camel's MessageId", e);
}
}
}
public JmsOperations getInOnlyTemplate() {
if (inOnlyTemplate == null) {
inOnlyTemplate = endpoint.createInOnlyTemplate();
}
return inOnlyTemplate;
}
public void setInOnlyTemplate(JmsOperations inOnlyTemplate) {
this.inOnlyTemplate = inOnlyTemplate;
}
public JmsOperations getInOutTemplate() {
if (inOutTemplate == null) {
inOutTemplate = endpoint.createInOutTemplate();
}
return inOutTemplate;
}
public void setInOutTemplate(JmsOperations inOutTemplate) {
this.inOutTemplate = inOutTemplate;
}
public UuidGenerator getUuidGenerator() {
return uuidGenerator;
}
public void setUuidGenerator(UuidGenerator uuidGenerator) {
this.uuidGenerator = uuidGenerator;
}
protected void doStart() throws Exception {
super.doStart();
if (uuidGenerator == null) {
// use the generator configured on the camel context
uuidGenerator = getEndpoint().getCamelContext().getUuidGenerator();
}
}
protected void doStop() throws Exception {
super.doStop();
}
}