| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.jms.processors; |
| |
| import org.apache.nifi.annotation.behavior.InputRequirement; |
| import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; |
| import org.apache.nifi.annotation.behavior.WritesAttribute; |
| import org.apache.nifi.annotation.behavior.WritesAttributes; |
| import org.apache.nifi.annotation.documentation.CapabilityDescription; |
| import org.apache.nifi.annotation.documentation.SeeAlso; |
| import org.apache.nifi.annotation.documentation.Tags; |
| import org.apache.nifi.components.AllowableValue; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.ValidationContext; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.expression.ExpressionLanguageScope; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; |
| import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback; |
| import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.util.StandardValidators; |
| import org.springframework.jms.connection.CachingConnectionFactory; |
| import org.springframework.jms.core.JmsTemplate; |
| import org.springframework.jms.support.JmsHeaders; |
| |
| import javax.jms.Session; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * Consuming JMS processor which upon each invocation of |
| * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a |
| * {@link FlowFile} containing the body of the consumed JMS message and JMS |
| * properties that came with message which are added to a {@link FlowFile} as |
| * attributes. |
| */ |
| @Tags({ "jms", "get", "message", "receive", "consume" }) |
| @InputRequirement(Requirement.INPUT_FORBIDDEN) |
| @CapabilityDescription("Consumes JMS Message of type BytesMessage, TextMessage, ObjectMessage, MapMessage or StreamMessage transforming its content to " |
| + "a FlowFile and transitioning it to 'success' relationship. JMS attributes such as headers and properties will be copied as FlowFile attributes. " |
| + "MapMessages will be transformed into JSONs and then into byte arrays. The other types will have their raw contents as byte array transferred into the flowfile.") |
| @WritesAttributes({ |
| @WritesAttribute(attribute = JmsHeaders.DELIVERY_MODE, description = "The JMSDeliveryMode from the message header."), |
| @WritesAttribute(attribute = JmsHeaders.EXPIRATION, description = "The JMSExpiration from the message header."), |
| @WritesAttribute(attribute = JmsHeaders.PRIORITY, description = "The JMSPriority from the message header."), |
| @WritesAttribute(attribute = JmsHeaders.REDELIVERED, description = "The JMSRedelivered from the message header."), |
| @WritesAttribute(attribute = JmsHeaders.TIMESTAMP, description = "The JMSTimestamp from the message header."), |
| @WritesAttribute(attribute = JmsHeaders.CORRELATION_ID, description = "The JMSCorrelationID from the message header."), |
| @WritesAttribute(attribute = JmsHeaders.MESSAGE_ID, description = "The JMSMessageID from the message header."), |
| @WritesAttribute(attribute = JmsHeaders.TYPE, description = "The JMSType from the message header."), |
| @WritesAttribute(attribute = JmsHeaders.REPLY_TO, description = "The JMSReplyTo from the message header."), |
| @WritesAttribute(attribute = JmsHeaders.DESTINATION, description = "The JMSDestination from the message header."), |
| @WritesAttribute(attribute = ConsumeJMS.JMS_MESSAGETYPE, description = "The JMS message type, can be TextMessage, BytesMessage, ObjectMessage, MapMessage or StreamMessage)."), |
| @WritesAttribute(attribute = "other attributes", description = "Each message property is written to an attribute.") |
| }) |
| @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class }) |
| public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> { |
| public static final String JMS_MESSAGETYPE = "jms.messagetype"; |
| |
| static final AllowableValue AUTO_ACK = new AllowableValue(String.valueOf(Session.AUTO_ACKNOWLEDGE), |
| "AUTO_ACKNOWLEDGE (" + String.valueOf(Session.AUTO_ACKNOWLEDGE) + ")", |
| "Automatically acknowledges a client's receipt of a message, regardless if NiFi session has been commited. " |
| + "Can result in data loss in the event where NiFi abruptly stopped before session was commited."); |
| |
| static final AllowableValue CLIENT_ACK = new AllowableValue(String.valueOf(Session.CLIENT_ACKNOWLEDGE), |
| "CLIENT_ACKNOWLEDGE (" + String.valueOf(Session.CLIENT_ACKNOWLEDGE) + ")", |
| "(DEFAULT) Manually acknowledges a client's receipt of a message after NiFi Session was commited, thus ensuring no data loss"); |
| |
| static final AllowableValue DUPS_OK = new AllowableValue(String.valueOf(Session.DUPS_OK_ACKNOWLEDGE), |
| "DUPS_OK_ACKNOWLEDGE (" + String.valueOf(Session.DUPS_OK_ACKNOWLEDGE) + ")", |
| "This acknowledgment mode instructs the session to lazily acknowledge the delivery of messages. May result in both data " |
| + "duplication and data loss while achieving the best throughput."); |
| |
| public static final String JMS_SOURCE_DESTINATION_NAME = "jms.source.destination"; |
| |
| static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder() |
| .name("Acknowledgement Mode") |
| .description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide " |
| + "better performance than Client Acknowledge.") |
| .required(true) |
| .allowableValues(AUTO_ACK, CLIENT_ACK, DUPS_OK) |
| .defaultValue(CLIENT_ACK.getValue()) |
| .build(); |
| |
| static final PropertyDescriptor DURABLE_SUBSCRIBER = new PropertyDescriptor.Builder() |
| .name("Durable subscription") |
| .description("If destination is Topic if present then make it the consumer durable. " + |
| "@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createDurableConsumer-javax.jms.Topic-java.lang.String-") |
| .required(false) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .defaultValue("false") |
| .allowableValues("true", "false") |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| static final PropertyDescriptor SHARED_SUBSCRIBER = new PropertyDescriptor.Builder() |
| .name("Shared subscription") |
| .description("If destination is Topic if present then make it the consumer shared. " + |
| "@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createSharedConsumer-javax.jms.Topic-java.lang.String-") |
| .required(false) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .defaultValue("false") |
| .allowableValues("true", "false") |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| static final PropertyDescriptor SUBSCRIPTION_NAME = new PropertyDescriptor.Builder() |
| .name("Subscription Name") |
| .description("The name of the subscription to use if destination is Topic and is shared or durable.") |
| .required(false) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .build(); |
| static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() |
| .name("Timeout") |
| .description("How long to wait to consume a message from the remote broker before giving up.") |
| .required(true) |
| .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) |
| .defaultValue("1 sec") |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .build(); |
| static final PropertyDescriptor ERROR_QUEUE = new PropertyDescriptor.Builder() |
| .name("Error Queue Name") |
| .description("The name of a JMS Queue where - if set - unprocessed messages will be routed. Usually provided by the administrator (e.g., 'queue://myErrorQueue' or 'myErrorQueue')." + |
| "Only applicable if 'Destination Type' is set to 'QUEUE'") |
| .required(false) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .build(); |
| |
| public static final Relationship REL_SUCCESS = new Relationship.Builder() |
| .name("success") |
| .description("All FlowFiles that are received from the JMS Destination are routed to this relationship") |
| .build(); |
| |
| private final static Set<Relationship> relationships; |
| |
| private final static List<PropertyDescriptor> thisPropertyDescriptors; |
| |
| static { |
| List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); |
| _propertyDescriptors.addAll(propertyDescriptors); |
| _propertyDescriptors.remove(MESSAGE_BODY); |
| _propertyDescriptors.remove(ALLOW_ILLEGAL_HEADER_CHARS); |
| _propertyDescriptors.remove(ATTRIBUTES_AS_HEADERS_REGEX); |
| |
| // change the validator on CHARSET property |
| _propertyDescriptors.remove(CHARSET); |
| PropertyDescriptor CHARSET_WITH_EL_VALIDATOR_PROPERTY = new PropertyDescriptor.Builder().fromPropertyDescriptor(CHARSET) |
| .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR_WITH_EVALUATION) |
| .build(); |
| _propertyDescriptors.add(CHARSET_WITH_EL_VALIDATOR_PROPERTY); |
| |
| _propertyDescriptors.add(ACKNOWLEDGEMENT_MODE); |
| _propertyDescriptors.add(DURABLE_SUBSCRIBER); |
| _propertyDescriptors.add(SHARED_SUBSCRIBER); |
| _propertyDescriptors.add(SUBSCRIPTION_NAME); |
| _propertyDescriptors.add(TIMEOUT); |
| _propertyDescriptors.add(ERROR_QUEUE); |
| thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); |
| |
| Set<Relationship> _relationships = new HashSet<>(); |
| _relationships.add(REL_SUCCESS); |
| relationships = Collections.unmodifiableSet(_relationships); |
| } |
| |
| @Override |
| protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { |
| final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext)); |
| |
| String destinationType = validationContext.getProperty(DESTINATION_TYPE).getValue(); |
| String errorQueue = validationContext.getProperty(ERROR_QUEUE).getValue(); |
| |
| if (errorQueue != null && !QUEUE.equals(destinationType)) { |
| validationResults.add(new ValidationResult.Builder() |
| .valid(false) |
| .subject(ERROR_QUEUE.getDisplayName()) |
| .explanation("'" + ERROR_QUEUE.getDisplayName() + "' is applicable only when " + |
| "'" + DESTINATION_TYPE.getDisplayName() + "'='" + QUEUE + "'") |
| .build()); |
| } |
| |
| return validationResults; |
| } |
| |
| /** |
| * Will construct a {@link FlowFile} containing the body of the consumed JMS |
| * message (if {@link JMSResponse} returned by {@link JMSConsumer} is not |
| * null) and JMS properties that came with message which are added to a |
| * {@link FlowFile} as attributes, transferring {@link FlowFile} to |
| * 'success' {@link Relationship}. |
| */ |
| @Override |
| protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession, final JMSConsumer consumer) throws ProcessException { |
| final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); |
| final String errorQueueName = context.getProperty(ERROR_QUEUE).evaluateAttributeExpressions().getValue(); |
| final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean(); |
| final boolean durable = durableBoolean == null ? false : durableBoolean; |
| final Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean(); |
| final boolean shared = sharedBoolean == null ? false : sharedBoolean; |
| final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue(); |
| final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); |
| |
| try { |
| consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, charset, new ConsumerCallback() { |
| @Override |
| public void accept(final JMSResponse response) { |
| if (response == null) { |
| return; |
| } |
| |
| FlowFile flowFile = processSession.create(); |
| flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody())); |
| |
| final Map<String, String> jmsHeaders = response.getMessageHeaders(); |
| final Map<String, String> jmsProperties = response.getMessageProperties(); |
| |
| flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession); |
| flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession); |
| flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName); |
| |
| processSession.getProvenanceReporter().receive(flowFile, destinationName); |
| processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType()); |
| processSession.transfer(flowFile, REL_SUCCESS); |
| processSession.commit(); |
| } |
| }); |
| } catch(Exception e) { |
| consumer.setValid(false); |
| throw e; // for backward compatibility with exception handling in flows |
| } |
| } |
| |
| /** |
| * Will create an instance of {@link JMSConsumer} |
| */ |
| @Override |
| protected JMSConsumer finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) { |
| int ackMode = processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger(); |
| jmsTemplate.setSessionAcknowledgeMode(ackMode); |
| |
| long timeout = processContext.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); |
| jmsTemplate.setReceiveTimeout(timeout); |
| |
| return new JMSConsumer(connectionFactory, jmsTemplate, this.getLogger()); |
| } |
| |
| @Override |
| public Set<Relationship> getRelationships() { |
| return relationships; |
| } |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| return thisPropertyDescriptors; |
| } |
| |
| /** |
| * Copies JMS attributes (i.e., headers and properties) as FF attributes. |
| * Given that FF attributes mandate that values are of type String, the |
| * copied values of JMS attributes will be "stringified" via |
| * String.valueOf(attribute). |
| */ |
| private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, String> jmsAttributes, FlowFile flowFile, ProcessSession processSession) { |
| Map<String, String> attributes = new HashMap<>(); |
| for (Entry<String, String> entry : jmsAttributes.entrySet()) { |
| attributes.put(entry.getKey(), entry.getValue()); |
| } |
| |
| flowFile = processSession.putAllAttributes(flowFile, attributes); |
| return flowFile; |
| } |
| } |