blob: 41afeeddb13b6ed7c4edb6f9d348b397517ef395 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.put;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* A base class for processors that send data to an external system using TCP or UDP.
*/
public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The ip address or hostname of the destination.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.description("The port on the destination.")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Socket Send Buffer")
.description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
"to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
"the data can be read, and incoming data will be dropped.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.required(true)
.build();
public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
.Builder().name("Idle Connection Expiration")
.description("The amount of time a connection should be held open without being used before closing the connection.")
.required(true)
.defaultValue("5 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
// Putting these properties here so sub-classes don't have to redefine them, but they are
// not added to the properties by default since not all processors may need them
public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
.Builder().name("Protocol")
.description("The protocol for communication.")
.required(true)
.allowableValues(TCP_VALUE, UDP_VALUE)
.defaultValue(TCP_VALUE.getValue())
.build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter")
.description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+ "If not specified, the entire content of the FlowFile will be used as a single message. "
+ "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+ "sent as a separate message. Note that if messages are delimited and some messages for a given FlowFile "
+ "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those "
+ "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' "
+ "relationship.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the character set of the data being sent.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Timeout")
.description("The timeout for connecting to and communicating with the destination. Does not apply to UDP")
.required(false)
.defaultValue("10 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Outgoing Message Delimiter")
.description("Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message "
+ "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should "
+ "ensure that the FlowFile content does not contain the delimiter character to avoid errors. In order to use a new line character you can "
+ "enter '\\n'. For a tab character use '\\t'. Finally for a carriage return use '\\r'.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("Connection Per FlowFile")
.description("Specifies whether to send each FlowFile's content on an individual connection.")
.required(true)
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
"messages will be sent over a secure connection.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to send to the destination are sent out this relationship.")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
protected volatile String transitUri;
protected EventSender eventSender;
protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<>());
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(HOSTNAME);
descriptors.add(PORT);
descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
descriptors.add(IDLE_EXPIRATION);
descriptors.add(TIMEOUT);
descriptors.addAll(getAdditionalProperties());
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.addAll(getAdditionalRelationships());
this.relationships = Collections.unmodifiableSet(relationships);
}
/**
* Override to provide additional relationships for the processor.
*
* @return a list of relationships
*/
protected List<Relationship> getAdditionalRelationships() {
return Collections.EMPTY_LIST;
}
/**
* Override to provide additional properties for the processor.
*
* @return a list of properties
*/
protected List<PropertyDescriptor> getAdditionalProperties() {
return Collections.EMPTY_LIST;
}
@Override
public final Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
// initialize the queue of senders, one per task, senders will get created on the fly in onTrigger
this.eventSender = getEventSender(context);
this.transitUri = createTransitUri(context);
}
@OnStopped
public void closeSenders() throws Exception {
if (eventSender != null) {
eventSender.close();
}
}
/**
* Sub-classes construct a transit uri for provenance events. Called from @OnScheduled
* method of this class.
*
* @param context the current context
*
* @return the transit uri
*/
protected abstract String createTransitUri(final ProcessContext context);
protected EventSender<?> getEventSender(final ProcessContext context) {
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
final String protocol = getProtocol(context);
final boolean singleEventPerConnection = context.getProperty(CONNECTION_PER_FLOWFILE).getValue() != null ? context.getProperty(CONNECTION_PER_FLOWFILE).asBoolean() : false;
final NettyEventSenderFactory factory = getNettyEventSenderFactory(hostname, port, protocol);
factory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
factory.setWorkerThreads(context.getMaxConcurrentTasks());
factory.setMaxConnections(context.getMaxConcurrentTasks());
factory.setSocketSendBufferSize(context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue());
factory.setSingleEventPerConnection(singleEventPerConnection);
final int timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
factory.setTimeout(Duration.ofMillis(timeout));
final PropertyValue sslContextServiceProperty = context.getProperty(SSL_CONTEXT_SERVICE);
if (sslContextServiceProperty.isSet()) {
final SSLContextService sslContextService = sslContextServiceProperty.asControllerService(SSLContextService.class);
final SSLContext sslContext = sslContextService.createContext();
factory.setSslContext(sslContext);
}
return factory.getEventSender();
}
/**
* Represents a range of messages from a FlowFile.
*/
protected static class Range {
private final long start;
private final long end;
public Range(final long start, final long end) {
this.start = start;
this.end = end;
}
public long getStart() {
return start;
}
public long getEnd() {
return end;
}
@Override
public String toString() {
return "Range[" + start + "-" + end + "]";
}
}
/**
* A wrapper to hold the ranges of a FlowFile that were successful and ranges that failed, and then
* transfer those ranges appropriately.
*/
protected class FlowFileMessageBatch {
private final ProcessSession session;
private final FlowFile flowFile;
private final long startTime = System.nanoTime();
private final List<Range> successfulRanges = new ArrayList<>();
private final List<Range> failedRanges = new ArrayList<>();
private Exception lastFailureReason;
private long numMessages = -1L;
private long completeTime = 0L;
private boolean canceled = false;
public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile) {
this.session = session;
this.flowFile = flowFile;
}
public synchronized void cancelOrComplete() {
if (isComplete()) {
completeSession();
return;
}
this.canceled = true;
session.rollback();
successfulRanges.clear();
failedRanges.clear();
}
public synchronized void addSuccessfulRange(final long start, final long end) {
if (canceled) {
return;
}
successfulRanges.add(new Range(start, end));
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
public synchronized void addFailedRange(final long start, final long end, final Exception e) {
if (canceled) {
return;
}
failedRanges.add(new Range(start, end));
lastFailureReason = e;
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
private boolean isComplete() {
return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages);
}
public synchronized void setNumMessages(final long msgCount) {
this.numMessages = msgCount;
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
private void transferRanges(final List<Range> ranges, final Relationship relationship) {
Collections.sort(ranges, new Comparator<Range>() {
@Override
public int compare(final Range o1, final Range o2) {
return Long.compare(o1.getStart(), o2.getStart());
}
});
for (int i = 0; i < ranges.size(); i++) {
Range range = ranges.get(i);
int count = 1;
while (i + 1 < ranges.size()) {
// Check if the next range in the List continues where this one left off.
final Range nextRange = ranges.get(i + 1);
if (nextRange.getStart() == range.getEnd()) {
// We have two ranges in a row that are contiguous; combine them into a single Range.
range = new Range(range.getStart(), nextRange.getEnd());
count++;
i++;
} else {
break;
}
}
// Create a FlowFile for this range.
FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart());
if (relationship == REL_SUCCESS) {
session.getProvenanceReporter().send(child, transitUri, "Sent " + count + " messages");
session.transfer(child, relationship);
} else {
child = session.penalize(child);
session.transfer(child, relationship);
}
}
}
public synchronized void completeSession() {
if (canceled) {
return;
}
if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
getLogger().info("Completed processing {} but sent 0 FlowFiles", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.commitAsync();
return;
}
if (successfulRanges.isEmpty()) {
getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason});
final FlowFile penalizedFlowFile = session.penalize(flowFile);
session.transfer(penalizedFlowFile, REL_FAILURE);
session.commitAsync();
return;
}
if (failedRanges.isEmpty()) {
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + successfulRanges.size() + " messages;", transferMillis);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
session.commitAsync();
return;
}
// At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way
// successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success'
// and the failed messages to 'failure'.
transferRanges(successfulRanges, REL_SUCCESS);
transferRanges(failedRanges, REL_FAILURE);
session.remove(flowFile);
getLogger().error("Successfully sent {} messages, but failed to send {} messages; the last error received was {}",
new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason});
session.commitAsync();
}
}
/**
* Gets the current value of the "Outgoing Message Delimiter" property and parses the special characters.
*
* @param context
* - the current process context.
* @param flowFile
* - the FlowFile being processed.
*
* @return String containing the Delimiter value.
*/
protected String getOutgoingMessageDelimiter(final ProcessContext context, final FlowFile flowFile) {
String delimiter = context.getProperty(OUTGOING_MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
if (delimiter != null) {
delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
return delimiter;
}
protected String getProtocol(final ProcessContext context) {
return context.getProperty(PROTOCOL).getValue();
}
protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.valueOf(protocol));
}
}