blob: b9fcd2001f58913038652cfc5ef311afb60504d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.configuration.LineEnding;
import org.apache.nifi.event.transport.netty.StringNettyEventSenderFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.syslog.parsers.SyslogParser;
import org.apache.nifi.util.StopWatch;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@TriggerWhenEmpty
@Tags({"syslog", "put", "udp", "tcp", "logs"})
@CapabilityDescription("Sends Syslog messages to a given host and port over TCP or UDP. Messages are constructed from the \"Message ___\" properties of the processor " +
"which can use expression language to generate messages from incoming FlowFiles. The properties are used to construct messages of the form: " +
"(<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional. The constructed messages are checked against regular expressions for " +
"RFC5424 and RFC3164 formatted messages. The timestamp can be an RFC5424 timestamp with a format of \"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", " +
"or it can be an RFC3164 timestamp with a format of \"MMM d HH:mm:ss\". If a message is constructed that does not form a valid Syslog message according to the " +
"above description, then it is routed to the invalid relationship. Valid messages are sent to the Syslog server and successes are routed to the success relationship, " +
"failures routed to the failure relationship.")
@SeeAlso({ListenSyslog.class, ParseSyslog.class})
public class PutSyslog extends AbstractSyslogProcessor {
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The IP address or hostname of the Syslog server.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Socket Send Buffer")
.description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
"to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
"the data can be read, and incoming data will be dropped.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor
.Builder().name("Batch Size")
.description("The number of incoming FlowFiles to process in a single execution of this processor.")
.required(true)
.defaultValue("25")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
.Builder().name("Idle Connection Expiration")
.description("The amount of time a connection should be held open without being used before closing the connection.")
.required(true)
.defaultValue("5 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MSG_PRIORITY = new PropertyDescriptor
.Builder().name("Message Priority")
.description("The priority for the Syslog messages, excluding < >.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor MSG_VERSION = new PropertyDescriptor
.Builder().name("Message Version")
.description("The version for the Syslog messages.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor MSG_TIMESTAMP = new PropertyDescriptor
.Builder().name("Message Timestamp")
.description("The timestamp for the Syslog messages. The timestamp can be an RFC5424 timestamp with a format of " +
"\"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", \" or it can be an RFC3164 timestamp " +
"with a format of \"MMM d HH:mm:ss\".")
.required(true)
.defaultValue("${now():format('MMM d HH:mm:ss')}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor MSG_HOSTNAME = new PropertyDescriptor
.Builder().name("Message Hostname")
.description("The hostname for the Syslog messages.")
.required(true)
.defaultValue("${hostname(true)}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor MSG_BODY = new PropertyDescriptor
.Builder().name("Message Body")
.description("The body for the Syslog messages.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog " +
"messages will be sent over a secure connection.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.dependsOn(PROTOCOL, TCP_VALUE)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to Syslog are sent out this relationship.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to send to Syslog are sent out this relationship.")
.build();
public static final Relationship REL_INVALID = new Relationship.Builder()
.name("invalid")
.description("FlowFiles that do not form a valid Syslog message are sent out this relationship.")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private EventSender<String> eventSender;
private String transitUri;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(HOSTNAME);
descriptors.add(PROTOCOL);
descriptors.add(PORT);
descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(IDLE_EXPIRATION);
descriptors.add(TIMEOUT);
descriptors.add(BATCH_SIZE);
descriptors.add(CHARSET);
descriptors.add(MSG_PRIORITY);
descriptors.add(MSG_VERSION);
descriptors.add(MSG_TIMESTAMP);
descriptors.add(MSG_HOSTNAME);
descriptors.add(MSG_BODY);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_INVALID);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();
final String protocol = context.getProperty(PROTOCOL).getValue();
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) {
results.add(new ValidationResult.Builder()
.explanation("SSL can not be used with UDP")
.valid(false).subject("SSL Context").build());
}
return results;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws InterruptedException {
eventSender = getEventSender(context);
final String protocol = context.getProperty(PROTOCOL).getValue();
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
transitUri = String.format("%s://%s:%s", protocol, hostname, port);
}
@OnStopped
public void onStopped() throws Exception {
if (eventSender != null) {
eventSender.close();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles.isEmpty()) {
context.yield();
} else {
for (final FlowFile flowFile : flowFiles) {
final StopWatch timer = new StopWatch(true);
final String syslogMessage = getSyslogMessage(context, flowFile);
if (isValid(syslogMessage)) {
try {
eventSender.sendEvent(syslogMessage);
timer.stop();
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
session.getProvenanceReporter().send(flowFile, transitUri, duration, true);
getLogger().debug("Send Completed {}", flowFile);
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
getLogger().error("Send Failed {}", flowFile, e);
session.transfer(flowFile, REL_FAILURE);
}
} else {
getLogger().debug("Syslog Message Invalid {}", flowFile);
session.transfer(flowFile, REL_INVALID);
}
}
}
}
protected EventSender<String> getEventSender(final ProcessContext context) {
final TransportProtocol protocol = TransportProtocol.valueOf(context.getProperty(PROTOCOL).getValue());
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions().getValue());
final LineEnding lineEnding = TransportProtocol.TCP.equals(protocol) ? LineEnding.UNIX : LineEnding.NONE;
final StringNettyEventSenderFactory factory = new StringNettyEventSenderFactory(getLogger(), hostname, port, protocol, charset, lineEnding);
factory.setThreadNamePrefix(String.format("%s[%s]", PutSyslog.class.getSimpleName(), getIdentifier()));
factory.setWorkerThreads(context.getMaxConcurrentTasks());
factory.setMaxConnections(context.getMaxConcurrentTasks());
final int timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
factory.setTimeout(Duration.ofMillis(timeout));
final PropertyValue sslContextServiceProperty = context.getProperty(SSL_CONTEXT_SERVICE);
if (sslContextServiceProperty.isSet()) {
final SSLContextService sslContextService = sslContextServiceProperty.asControllerService(SSLContextService.class);
final SSLContext sslContext = sslContextService.createContext();
factory.setSslContext(sslContext);
}
return factory.getEventSender();
}
private String getSyslogMessage(final ProcessContext context, final FlowFile flowFile) {
final String priority = context.getProperty(MSG_PRIORITY).evaluateAttributeExpressions(flowFile).getValue();
final String version = context.getProperty(MSG_VERSION).evaluateAttributeExpressions(flowFile).getValue();
final String timestamp = context.getProperty(MSG_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
final String hostname = context.getProperty(MSG_HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
final String body = context.getProperty(MSG_BODY).evaluateAttributeExpressions(flowFile).getValue();
final StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("<").append(priority).append(">");
if (version != null) {
messageBuilder.append(version).append(StringUtils.SPACE);
}
messageBuilder.append(timestamp).append(StringUtils.SPACE).append(hostname).append(StringUtils.SPACE).append(body);
return messageBuilder.toString();
}
private boolean isValid(final String message) {
for (final Pattern pattern : SyslogParser.MESSAGE_PATTERNS) {
final Matcher matcher = pattern.matcher(message);
if (matcher.matches()) {
return true;
}
}
return false;
}
}