blob: 76795f0685be8e95662632aed419f2e72df9bace [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.event.StandardEventFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@SupportsBatching
@Tags({"ingest", "udp", "listen", "source"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Listens for Datagram Packets on a given port. The default behavior produces a FlowFile " +
"per datagram, however for higher throughput the Max Batch Size property may be increased to specify the number of " +
"datagrams to batch together in a single FlowFile. This processor can be restricted to listening for datagrams from a " +
"specific remote host and port by specifying the Sending Host and Sending Host Port properties, otherwise it will listen " +
"for datagrams from all hosts and ports.")
@WritesAttributes({
@WritesAttribute(attribute="udp.sender", description="The sending host of the messages."),
@WritesAttribute(attribute="udp.port", description="The sending port the messages were received.")
})
public class ListenUDP extends AbstractListenEventBatchingProcessor<StandardEvent> {
public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder()
.name("Sending Host")
.description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will "
+ "be accepted. Improves Performance. May be a system property or an environment variable.")
.addValidator(new HostValidator())
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor SENDING_HOST_PORT = new PropertyDescriptor.Builder()
.name("Sending Host Port")
.description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and "
+ "this port will be accepted. Improves Performance. May be a system property or an environment variable.")
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final String UDP_PORT_ATTR = "udp.port";
public static final String UDP_SENDER_ATTR = "udp.sender";
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(
SENDING_HOST,
SENDING_HOST_PORT
);
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> result = new ArrayList<>();
final String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
final String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
if (StringUtils.isBlank(sendingHost) && StringUtils.isNotBlank(sendingPort)) {
result.add(
new ValidationResult.Builder()
.subject(SENDING_HOST.getName())
.valid(false)
.explanation("Must specify Sending Host when specifying Sending Host Port")
.build());
} else if (StringUtils.isBlank(sendingPort) && StringUtils.isNotBlank(sendingHost)) {
result.add(
new ValidationResult.Builder()
.subject(SENDING_HOST_PORT.getName())
.valid(false)
.explanation("Must specify Sending Host Port when specifying Sending Host")
.build());
}
return result;
}
@Override
protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<StandardEvent> events)
throws IOException {
final String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
final Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final ByteBufferSource byteBufferSource = new ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize);
final EventFactory<StandardEvent> eventFactory = new StandardEventFactory();
return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource, events, getLogger(), sendingHost, sendingHostPort);
}
@Override
protected Map<String, String> getAttributes(final FlowFileEventBatch batch) {
final String sender = batch.getEvents().get(0).getSender();
final Map<String,String> attributes = new HashMap<>(3);
attributes.put(UDP_SENDER_ATTR, sender);
attributes.put(UDP_PORT_ATTR, String.valueOf(port));
return attributes;
}
@Override
protected String getTransitUri(FlowFileEventBatch batch) {
final String sender = batch.getEvents().get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
final String transitUri = new StringBuilder().append("udp").append("://").append(senderHost).append(":")
.append(port).toString();
return transitUri;
}
public static class HostValidator implements Validator {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
try {
InetAddress.getByName(input);
return new ValidationResult.Builder().subject(subject).valid(true).input(input).build();
} catch (final UnknownHostException e) {
return new ValidationResult.Builder().subject(subject).valid(false).input(input).explanation("Unknown host: " + e).build();
}
}
}
}