blob: 2a20d5d45ee37ba2d9cdf1e2cccde7038b5f36f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.event.StandardEventFactory;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@SupportsBatching
@Tags({"ingest", "udp", "listen", "source", "record"})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Listens for Datagram Packets on a given port and reads the content of each datagram using the " +
"configured Record Reader. Each record will then be written to a flow file using the configured Record Writer. This processor " +
"can be restricted to listening for datagrams from a specific remote host and port by specifying the Sending Host and " +
"Sending Host Port properties, otherwise it will listen for datagrams from all hosts and ports.")
@WritesAttributes({
@WritesAttribute(attribute="udp.sender", description="The sending host of the messages."),
@WritesAttribute(attribute="udp.port", description="The sending port the messages were received."),
@WritesAttribute(attribute="record.count", description="The number of records written to the flow file."),
@WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file.")
})
public class ListenUDPRecord extends AbstractListenEventProcessor<StandardEvent> {
public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder()
.name("sending-host")
.displayName("Sending Host")
.description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will "
+ "be accepted. Improves Performance. May be a system property or an environment variable.")
.addValidator(new HostValidator())
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor SENDING_HOST_PORT = new PropertyDescriptor.Builder()
.name("sending-host-port")
.displayName("Sending Host Port")
.description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and "
+ "this port will be accepted. Improves Performance. May be a system property or an environment variable.")
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The Record Reader to use for reading the content of incoming datagrams.")
.identifiesControllerService(RecordReaderFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before writing to a flow file.")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
public static final PropertyDescriptor POLL_TIMEOUT = new PropertyDescriptor.Builder()
.name("poll-timeout")
.displayName("Poll Timeout")
.description("The amount of time to wait when polling the internal queue for more datagrams. If no datagrams are found after waiting " +
"for the configured timeout, then the processor will emit whatever records have been obtained up to that point.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("50 ms")
.required(true)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("batch-size")
.displayName("Batch Size")
.description("The maximum number of datagrams to write as records to a single FlowFile. The Batch Size will only be reached when " +
"data is coming in more frequently than the Poll Timeout.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("1000")
.required(true)
.build();
public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
.name("parse.failure")
.description("If a datagram cannot be parsed using the configured Record Reader, the contents of the "
+ "message will be routed to this Relationship as its own individual FlowFile.")
.build();
public static final String UDP_PORT_ATTR = "udp.port";
public static final String UDP_SENDER_ATTR = "udp.sender";
public static final String RECORD_COUNT_ATTR = "record.count";
private volatile long pollTimeout;
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(
POLL_TIMEOUT,
BATCH_SIZE,
RECORD_READER,
RECORD_WRITER,
SENDING_HOST,
SENDING_HOST_PORT
);
}
@Override
protected List<Relationship> getAdditionalRelationships() {
return Arrays.asList(REL_PARSE_FAILURE);
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
super.onScheduled(context);
this.pollTimeout = context.getProperty(POLL_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
}
@Override
protected long getLongPollTimeout() {
return pollTimeout;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> result = new ArrayList<>();
final String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
final String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
if (StringUtils.isBlank(sendingHost) && StringUtils.isNotBlank(sendingPort)) {
result.add(
new ValidationResult.Builder()
.subject(SENDING_HOST.getName())
.valid(false)
.explanation("Must specify Sending Host when specifying Sending Host Port")
.build());
} else if (StringUtils.isBlank(sendingPort) && StringUtils.isNotBlank(sendingHost)) {
result.add(
new ValidationResult.Builder()
.subject(SENDING_HOST_PORT.getName())
.valid(false)
.explanation("Must specify Sending Host Port when specifying Sending Host")
.build());
}
return result;
}
@Override
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<StandardEvent> events)
throws IOException {
final String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final ByteBufferSource byteBufferSource = new ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize);
final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource, events, getLogger(), sendingHost, sendingHostPort);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int maxBatchSize = context.getProperty(BATCH_SIZE).asInteger();
final Map<String, FlowFileRecordWriter> flowFileRecordWriters = new HashMap<>();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
for (int i=0; i < maxBatchSize; i++) {
// this processor isn't leveraging the error queue so don't bother polling to avoid the overhead
// if the error handling is ever changed to use the error queue then this flag needs to be changed as well
final StandardEvent event = getMessage(true, false, session);
// break out if we don't have any messages, don't yield since we already do a long poll inside getMessage
if (event == null) {
break;
}
// attempt to read all of the records from the current datagram into a list in memory so that we can ensure the
// entire datagram can be read as records, and if not transfer the whole thing to parse.failure
final RecordReader reader;
final List<Record> records = new ArrayList<>();
try (final InputStream in = new ByteArrayInputStream(event.getData())) {
final long inputLength = event.getData() == null ? -1 : event.getData().length;
reader = readerFactory.createRecordReader(Collections.emptyMap(), in, inputLength, getLogger());
Record record;
while((record = reader.nextRecord()) != null) {
records.add(record);
}
} catch (final Exception e) {
handleParseFailure(event, session, e);
continue;
}
if (records.size() == 0) {
handleParseFailure(event, session, null);
continue;
}
// see if we already started a flow file and writer for the given sender
// if an exception happens creating the flow file or writer, put the event in the error queue to try it again later
FlowFileRecordWriter flowFileRecordWriter = flowFileRecordWriters.get(event.getSender());
if (flowFileRecordWriter == null) {
FlowFile flowFile = null;
OutputStream rawOut = null;
RecordSetWriter writer = null;
try {
flowFile = session.create();
rawOut = session.write(flowFile);
final Record firstRecord = records.get(0);
final RecordSchema recordSchema = firstRecord.getSchema();
final RecordSchema writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema);
writer = writerFactory.createWriter(getLogger(), writeSchema, rawOut, flowFile);
writer.beginRecordSet();
flowFileRecordWriter = new FlowFileRecordWriter(flowFile, writer);
flowFileRecordWriters.put(event.getSender(), flowFileRecordWriter);
} catch (final Exception ex) {
getLogger().error("Failed to properly initialize record writer. Datagram will be queued for re-processing.", ex);
try {
if (writer != null) {
writer.close();
}
} catch (final Exception e) {
getLogger().warn("Failed to close Record Writer", e);
}
if (rawOut != null) {
IOUtils.closeQuietly(rawOut);
}
if (flowFile != null) {
session.remove(flowFile);
}
context.yield();
break;
}
}
// attempt to write each record, if any record fails then remove the flow file and break out of the loop
final RecordSetWriter writer = flowFileRecordWriter.getRecordWriter();
try {
for (final Record record : records) {
writer.write(record);
}
} catch (Exception e) {
getLogger().error("Failed to write records due to: " + e.getMessage(), e);
IOUtils.closeQuietly(writer);
session.remove(flowFileRecordWriter.getFlowFile());
flowFileRecordWriters.remove(event.getSender());
break;
}
}
// attempt to finish each record set and transfer the flow file, if an error is encountered calling
// finishRecordSet or closing the writer then remove the flow file
for (final Map.Entry<String,FlowFileRecordWriter> entry : flowFileRecordWriters.entrySet()) {
final String sender = entry.getKey();
final FlowFileRecordWriter flowFileRecordWriter = entry.getValue();
final RecordSetWriter writer = flowFileRecordWriter.getRecordWriter();
FlowFile flowFile = flowFileRecordWriter.getFlowFile();
try {
final WriteResult writeResult;
try {
writeResult = writer.finishRecordSet();
} finally {
writer.close();
}
if (writeResult.getRecordCount() == 0) {
session.remove(flowFile);
continue;
}
final Map<String, String> attributes = new HashMap<>();
attributes.putAll(getAttributes(sender));
attributes.putAll(writeResult.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.getRecordCount()));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
final String transitUri = getTransitUri(sender);
session.getProvenanceReporter().receive(flowFile, transitUri);
} catch (final Exception e) {
getLogger().error("Unable to properly complete record set due to: " + e.getMessage(), e);
session.remove(flowFile);
}
}
}
private void handleParseFailure(final StandardEvent event, final ProcessSession session, final Exception cause) {
handleParseFailure(event, session, cause, "Failed to parse datagram using the configured Record Reader. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
}
private void handleParseFailure(final StandardEvent event, final ProcessSession session, final Exception cause, final String message) {
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
final Map<String, String> attributes = getAttributes(event.getSender());
FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(event.getData()));
failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
final String transitUri = getTransitUri(event.getSender());
session.getProvenanceReporter().receive(failureFlowFile, transitUri);
session.transfer(failureFlowFile, REL_PARSE_FAILURE);
if (cause == null) {
getLogger().error(message);
} else {
getLogger().error(message, cause);
}
session.adjustCounter("Parse Failures", 1, false);
}
private Map<String, String> getAttributes(final String sender) {
final Map<String, String> attributes = new HashMap<>(3);
attributes.put(UDP_SENDER_ATTR, sender);
attributes.put(UDP_PORT_ATTR, String.valueOf(port));
return attributes;
}
private String getTransitUri(final String sender) {
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("udp").append("://").append(senderHost).append(":")
.append(port).toString();
return transitUri;
}
/**
* Holder class to pass around a flow file and the writer that is writing records to it.
*/
private static class FlowFileRecordWriter {
private final FlowFile flowFile;
private final RecordSetWriter recordWriter;
public FlowFileRecordWriter(final FlowFile flowFile, final RecordSetWriter recordWriter) {
this.flowFile = flowFile;
this.recordWriter = recordWriter;
}
public FlowFile getFlowFile() {
return flowFile;
}
public RecordSetWriter getRecordWriter() {
return recordWriter;
}
}
private static class HostValidator implements Validator {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
try {
InetAddress.getByName(input);
return new ValidationResult.Builder().subject(subject).valid(true).input(input).build();
} catch (final UnknownHostException e) {
return new ValidationResult.Builder().subject(subject).valid(false).input(input).explanation("Unknown host: " + e).build();
}
}
}
}