blob: ac9fbce7f566debfe645bc4bc0e56ecad48faac0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.nifi.jms.processors.MessageBodyToBytesConverter.MessageConversionException;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsHeaders;
import org.springframework.jms.support.JmsUtils;
/**
* Generic consumer of messages from JMS compliant messaging system.
*/
final class JMSConsumer extends JMSWorker {
JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog logger) {
super(connectionFactory, jmsTemplate, logger);
logger.debug("Created Message Consumer for '{}'", new Object[] {jmsTemplate});
}
private MessageConsumer createMessageConsumer(final Session session, final String destinationName, final boolean durable, final boolean shared, final String subscriberName) throws JMSException {
final boolean isPubSub = JMSConsumer.this.jmsTemplate.isPubSubDomain();
final Destination destination = JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, destinationName, isPubSub);
if (isPubSub) {
if (shared) {
try {
if (durable) {
return session.createSharedDurableConsumer((Topic) destination, subscriberName);
} else {
return session.createSharedConsumer((Topic) destination, subscriberName);
}
} catch (AbstractMethodError e) {
throw new ProcessException("Failed to create a shared consumer. Make sure the target broker is JMS 2.0 compliant.", e);
}
} else {
if (durable) {
return session.createDurableConsumer((Topic) destination, subscriberName, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
} else {
return session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
}
}
} else {
return session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
}
}
public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriberName, final String charset,
final ConsumerCallback consumerCallback) {
this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override
public Void doInJms(final Session session) throws JMSException {
final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriberName);
try {
final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
JMSResponse response = null;
if (message != null) {
String messageType;
byte[] messageBody;
try {
if (message instanceof TextMessage) {
messageType = TextMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset));
} else if (message instanceof BytesMessage) {
messageType = BytesMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message);
} else if (message instanceof ObjectMessage) {
messageType = ObjectMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
} else if (message instanceof StreamMessage) {
messageType = StreamMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage) message);
} else if (message instanceof MapMessage) {
messageType = MapMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((MapMessage) message);
} else {
acknowledge(message, session);
if (errorQueueName != null) {
processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", new Object[] {message, errorQueueName});
jmsTemplate.send(errorQueueName, __ -> message);
} else {
processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", new Object[] {message});
}
return null;
}
} catch (final MessageConversionException mce) {
processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.",
new Object[] {message}, mce);
acknowledge(message, session);
if (errorQueueName != null) {
jmsTemplate.send(errorQueueName, __ -> message);
}
return null;
}
final Map<String, String> messageHeaders = extractMessageHeaders(message);
final Map<String, String> messageProperties = extractMessageProperties(message);
response = new JMSResponse(messageType, messageBody, messageHeaders, messageProperties);
}
// invoke the processor callback (regardless if it's null,
// so the processor can yield) as part of this inJMS call
// and ACK message *only* after its successful invocation
// and if CLIENT_ACKNOWLEDGE is set.
consumerCallback.accept(response);
acknowledge(message, session);
} catch (Exception e) {
// We need to call recover to ensure that in the event of
// abrupt end or exception the current session will stop message
// delivery and restart with the oldest unacknowledged message
try {
session.recover();
} catch (Exception e1) {
// likely the session is closed...need to catch this so that the root cause of failure is propagated
processLog.debug("Failed to recover JMS session while handling initial error. The recover error is: ", e1);
}
throw e;
} finally {
JmsUtils.closeMessageConsumer(msgConsumer);
}
return null;
}
}, true);
}
private void acknowledge(final Message message, final Session session) throws JMSException {
if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
message.acknowledge();
}
}
@SuppressWarnings("unchecked")
private Map<String, String> extractMessageProperties(final Message message) {
final Map<String, String> properties = new HashMap<>();
try {
final Enumeration<String> propertyNames = message.getPropertyNames();
while (propertyNames.hasMoreElements()) {
String propertyName = propertyNames.nextElement();
properties.put(propertyName, String.valueOf(message.getObjectProperty(propertyName)));
}
} catch (JMSException e) {
this.processLog.warn("Failed to extract message properties", e);
}
return properties;
}
private Map<String, String> extractMessageHeaders(final Message message) throws JMSException {
final Map<String, String> messageHeaders = new HashMap<>();
messageHeaders.put(JmsHeaders.DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode()));
messageHeaders.put(JmsHeaders.EXPIRATION, String.valueOf(message.getJMSExpiration()));
messageHeaders.put(JmsHeaders.PRIORITY, String.valueOf(message.getJMSPriority()));
messageHeaders.put(JmsHeaders.REDELIVERED, String.valueOf(message.getJMSRedelivered()));
messageHeaders.put(JmsHeaders.TIMESTAMP, String.valueOf(message.getJMSTimestamp()));
messageHeaders.put(JmsHeaders.CORRELATION_ID, message.getJMSCorrelationID());
messageHeaders.put(JmsHeaders.MESSAGE_ID, message.getJMSMessageID());
messageHeaders.put(JmsHeaders.TYPE, message.getJMSType());
String replyToDestinationName = this.retrieveDestinationName(message.getJMSReplyTo(), JmsHeaders.REPLY_TO);
if (replyToDestinationName != null) {
messageHeaders.put(JmsHeaders.REPLY_TO, replyToDestinationName);
}
String destinationName = this.retrieveDestinationName(message.getJMSDestination(), JmsHeaders.DESTINATION);
if (destinationName != null) {
messageHeaders.put(JmsHeaders.DESTINATION, destinationName);
}
return messageHeaders;
}
private String retrieveDestinationName(Destination destination, String headerName) {
String destinationName = null;
if (destination != null) {
try {
destinationName = (destination instanceof Queue) ? ((Queue) destination).getQueueName()
: ((Topic) destination).getTopicName();
} catch (JMSException e) {
this.processLog.warn("Failed to retrieve Destination name for '" + headerName + "' header", e);
}
}
return destinationName;
}
static class JMSResponse {
private final byte[] messageBody;
private final String messageType;
private final Map<String, String> messageHeaders;
private final Map<String, String> messageProperties;
JMSResponse(String messageType, byte[] messageBody, Map<String, String> messageHeaders, Map<String, String> messageProperties) {
this.messageType = messageType;
this.messageBody = messageBody;
this.messageHeaders = Collections.unmodifiableMap(messageHeaders);
this.messageProperties = Collections.unmodifiableMap(messageProperties);
}
public String getMessageType() {
return messageType;
}
public byte[] getMessageBody() {
return this.messageBody;
}
public Map<String, String> getMessageHeaders() {
return this.messageHeaders;
}
public Map<String, String> getMessageProperties() {
return messageProperties;
}
}
/**
* Callback to be invoked while executing inJMS call (the call within the
* live JMS session)
*/
static interface ConsumerCallback {
void accept(JMSResponse response);
}
}