blob: 2d8f96f5f4b01d3d0009c0e526fc421a308bcaf0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
import org.apache.nifi.util.StopWatch;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.nifi.processors.standard.util.JmsProperties.ACKNOWLEDGEMENT_MODE;
import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CLIENT;
import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES;
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
public abstract class JmsConsumer extends AbstractProcessor {
public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage.";
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles are routed to success")
.build();
private final Set<Relationship> relationships;
private final List<PropertyDescriptor> propertyDescriptors;
public JmsConsumer() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(JMS_PROVIDER);
descriptors.add(URL);
descriptors.add(DESTINATION_NAME);
descriptors.add(TIMEOUT);
descriptors.add(BATCH_SIZE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(ACKNOWLEDGEMENT_MODE);
descriptors.add(MESSAGE_SELECTOR);
descriptors.add(JMS_PROPS_TO_ATTRIBUTES);
descriptors.add(CLIENT_ID_PREFIX);
this.propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
public void consume(final ProcessContext context, final ProcessSession session, final WrappedMessageConsumer wrappedConsumer) throws ProcessException {
final ComponentLog logger = getLogger();
final MessageConsumer consumer = wrappedConsumer.getConsumer();
final boolean clientAcknowledge = context.getProperty(ACKNOWLEDGEMENT_MODE).getValue().equalsIgnoreCase(ACK_MODE_CLIENT);
final long timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
final boolean addAttributes = context.getProperty(JMS_PROPS_TO_ATTRIBUTES).asBoolean();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final JmsProcessingSummary processingSummary = new JmsProcessingSummary();
final StopWatch stopWatch = new StopWatch(true);
for (int i = 0; i < batchSize; i++) {
final Message message;
try {
// If we haven't received a message, wait until one is available. If we have already received at least one
// message, then we are not willing to wait for more to become available, but we are willing to keep receiving
// all messages that are immediately available.
if (processingSummary.getMessagesReceived() == 0) {
message = consumer.receive(timeout);
} else {
message = consumer.receiveNoWait();
}
} catch (final JMSException e) {
logger.error("Failed to receive JMS Message due to {}", e);
wrappedConsumer.close(logger);
break;
}
if (message == null) { // if no messages, we're done
break;
}
try {
processingSummary.add(map2FlowFile(context, session, message, addAttributes, logger));
} catch (Exception e) {
logger.error("Failed to receive JMS Message due to {}", e);
wrappedConsumer.close(logger);
break;
}
}
if (processingSummary.getFlowFilesCreated() == 0) {
context.yield();
return;
}
session.commitAsync(() -> {
// if we need to acknowledge the messages, do so now.
final Message lastMessage = processingSummary.getLastMessageReceived();
if (clientAcknowledge && lastMessage != null) {
try {
lastMessage.acknowledge(); // acknowledge all received messages by acknowledging only the last.
} catch (final JMSException e) {
logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}",
new Object[]{processingSummary.getMessagesReceived(), e});
}
}
});
stopWatch.stop();
if (processingSummary.getFlowFilesCreated() > 0) {
final float secs = (stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
float messagesPerSec = (processingSummary.getMessagesReceived()) / secs;
final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}",
new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
}
}
public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ComponentLog logger)
throws Exception {
// Currently not very useful, because always one Message == one FlowFile
final AtomicInteger msgsThisFlowFile = new AtomicInteger(1);
FlowFile flowFile = session.create();
try {
// MapMessage is exception, add only name-value pairs to FlowFile attributes
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));
} else { // all other message types, write Message body to FlowFile content
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
final byte[] messageBody = JmsFactory.createByteArray(message);
out.write(messageBody);
} catch (final JMSException e) {
throw new ProcessException("Failed to receive JMS Message due to " + e.getMessage(), e);
}
}
});
}
if (addAttributes) {
flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
}
session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
session.transfer(flowFile, REL_SUCCESS);
logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'",
new Object[]{flowFile, msgsThisFlowFile.get()});
return new JmsProcessingSummary(flowFile.getSize(), message, flowFile);
} catch (Exception e) {
session.remove(flowFile);
throw e;
}
}
public static Map<String, String> createMapMessageValues(final MapMessage mapMessage) throws JMSException {
final Map<String, String> valueMap = new HashMap<>();
final Enumeration<?> enumeration = mapMessage.getMapNames();
while (enumeration.hasMoreElements()) {
final String name = (String) enumeration.nextElement();
final Object value = mapMessage.getObject(name);
if (value == null) {
valueMap.put(MAP_MESSAGE_PREFIX + name, "");
} else {
valueMap.put(MAP_MESSAGE_PREFIX + name, value.toString());
}
}
return valueMap;
}
}