blob: e4ec6721234ce8b5cb76322bd6a267e90b310aef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.eclipse.jetty.util.StringUtil;
import com.google.common.collect.Sets;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"attributes", "logging"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Emits attributes of the FlowFile at the specified log level")
public class LogAttribute extends AbstractProcessor {
public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder()
.name("Log Level")
.required(true)
.description("The Log Level to use when logging the Attributes")
.allowableValues(DebugLevels.values())
.defaultValue("info")
.build();
public static final PropertyDescriptor ATTRIBUTES_TO_LOG_CSV = new PropertyDescriptor.Builder()
.name("Attributes to Log")
.required(false)
.description("A comma-separated list of Attributes to Log. If not specified, all attributes will be logged unless `Attributes to Log by Regular Expression` is modified." +
" There's an AND relationship between the two properties.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ATTRIBUTES_TO_LOG_REGEX = new PropertyDescriptor.Builder()
.name("attributes-to-log-regex")
.displayName("Attributes to Log by Regular Expression")
.required(false)
.defaultValue(".*")
.description("A regular expression indicating the Attributes to Log. If not specified, all attributes will be logged unless `Attributes to Log` is modified." +
" There's an AND relationship between the two properties.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor ATTRIBUTES_TO_IGNORE_CSV = new PropertyDescriptor.Builder()
.name("Attributes to Ignore")
.description("A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored unless `Attributes to Ignore by Regular Expression` is modified." +
" There's an OR relationship between the two properties.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ATTRIBUTES_TO_IGNORE_REGEX = new PropertyDescriptor.Builder()
.name("attributes-to-ignore-regex")
.displayName("Attributes to Ignore by Regular Expression")
.required(false)
.description("A regular expression indicating the Attributes to Ignore. If not specified, no attributes will be ignored unless `Attributes to Ignore` is modified." +
" There's an OR relationship between the two properties.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor LOG_PAYLOAD = new PropertyDescriptor.Builder()
.name("Log Payload")
.required(true)
.description("If true, the FlowFile's payload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.")
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder()
.name("Log prefix")
.required(false)
.description("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("character-set")
.displayName("Character Set")
.description("The name of the CharacterSet to use")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue(Charset.defaultCharset().name())
.required(true)
.build();
public static final String FIFTY_DASHES = "--------------------------------------------------";
public static enum DebugLevels {
trace, debug, info, warn, error
}
public static final long ONE_MB = 1024 * 1024;
private Set<Relationship> relationships;
private List<PropertyDescriptor> supportedDescriptors;
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles are routed to this relationship")
.build();
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> procRels = new HashSet<>();
procRels.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(procRels);
// descriptors
final List<PropertyDescriptor> supDescriptors = new ArrayList<>();
supDescriptors.add(LOG_LEVEL);
supDescriptors.add(LOG_PAYLOAD);
supDescriptors.add(ATTRIBUTES_TO_LOG_CSV);
supDescriptors.add(ATTRIBUTES_TO_LOG_REGEX);
supDescriptors.add(ATTRIBUTES_TO_IGNORE_CSV);
supDescriptors.add(ATTRIBUTES_TO_IGNORE_REGEX);
supDescriptors.add(LOG_PREFIX);
supDescriptors.add(CHARSET);
supportedDescriptors = Collections.unmodifiableList(supDescriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return supportedDescriptors;
}
protected String processFlowFile(final ComponentLog logger, final DebugLevels logLevel, final FlowFile flowFile, final ProcessSession session, final ProcessContext context) {
final Set<String> attributeKeys = getAttributesToLog(flowFile.getAttributes().keySet(), context);
final ComponentLog LOG = getLogger();
final String dashedLine;
String logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
if (StringUtil.isBlank(logPrefix)) {
dashedLine = StringUtils.repeat('-', 50);
} else {
// abbreviate long lines
logPrefix = StringUtils.abbreviate(logPrefix, 40);
// center the logPrefix and pad with dashes
logPrefix = StringUtils.center(logPrefix, 40, '-');
// five dashes on the left and right side, plus the dashed logPrefix
dashedLine = StringUtils.repeat('-', 5) + logPrefix + StringUtils.repeat('-', 5);
}
// Pretty print metadata
final StringBuilder message = new StringBuilder();
message.append("logging for flow file ").append(flowFile);
message.append("\n");
message.append(dashedLine);
message.append("\nStandard FlowFile Attributes");
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "entryDate", new Date(flowFile.getEntryDate())));
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "lineageStartDate", new Date(flowFile.getLineageStartDate())));
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", "fileSize", flowFile.getSize()));
message.append("\nFlowFile Attribute Map Content");
for (final String key : attributeKeys) {
message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", key, flowFile.getAttribute(key)));
}
message.append("\n");
message.append(dashedLine);
// The user can request to log the payload
final boolean logPayload = context.getProperty(LOG_PAYLOAD).asBoolean();
if (logPayload) {
message.append("\n");
if (flowFile.getSize() < ONE_MB) {
final FlowFilePayloadCallback callback = new FlowFilePayloadCallback(charset);
session.read(flowFile, callback);
message.append(callback.getContents());
} else {
message.append("\n Not including payload since it is larger than one mb.");
}
}
final String outputMessage = message.toString().trim();
// Uses optional property to specify logging level
switch (logLevel) {
case info:
LOG.info(outputMessage);
break;
case debug:
LOG.debug(outputMessage);
break;
case warn:
LOG.warn(outputMessage);
break;
case trace:
LOG.trace(outputMessage);
break;
case error:
LOG.error(outputMessage);
break;
default:
LOG.debug(outputMessage);
}
return outputMessage;
}
private Set<String> getAttributesToLog(final Set<String> flowFileAttrKeys, final ProcessContext context) {
// collect properties
final String attrsToLogValue = context.getProperty(ATTRIBUTES_TO_LOG_CSV).getValue();
final String attrsToRemoveValue = context.getProperty(ATTRIBUTES_TO_IGNORE_CSV).getValue();
final Set<String> attrsToLog = StringUtils.isBlank(attrsToLogValue) ? Sets.newHashSet(flowFileAttrKeys) : Sets.newHashSet(attrsToLogValue.split("\\s*,\\s*"));
final Set<String> attrsToRemove = StringUtils.isBlank(attrsToRemoveValue) ? Sets.newHashSet() : Sets.newHashSet(attrsToRemoveValue.split("\\s*,\\s*"));
final Pattern attrsToLogRegex = Pattern.compile(context.getProperty(ATTRIBUTES_TO_LOG_REGEX).getValue());
final String attrsToRemoveRegexValue = context.getProperty(ATTRIBUTES_TO_IGNORE_REGEX).getValue();
final Pattern attrsToRemoveRegex = attrsToRemoveRegexValue == null ? null : Pattern.compile(context.getProperty(ATTRIBUTES_TO_IGNORE_REGEX).getValue());
return flowFileAttrKeys.stream()
.filter(candidate -> {
// we'll consider logging an attribute if either no explicit attributes to log were configured,
// if this property was configured to be logged, or if the regular expression of properties to log matches
if ((attrsToLog.isEmpty() || attrsToLog.contains(candidate)) && attrsToLogRegex.matcher(candidate).matches()) {
// log properties we've _not_ configured either explicitly or by regular expression to be ignored.
if ((attrsToRemove.isEmpty() || !attrsToRemove.contains(candidate)) && (attrsToRemoveRegex == null || !attrsToRemoveRegex.matcher(candidate).matches())) {
return true;
}
}
return false;
}).collect(Collectors.toCollection(TreeSet::new));
}
private void transferChunk(final ProcessSession session) {
final List<FlowFile> flowFiles = session.get(50);
if (!flowFiles.isEmpty()) {
session.transfer(flowFiles, REL_SUCCESS);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final String logLevelValue = context.getProperty(LOG_LEVEL).getValue().toLowerCase();
final DebugLevels logLevel;
try {
logLevel = DebugLevels.valueOf(logLevelValue);
} catch (Exception e) {
throw new ProcessException(e);
}
final ComponentLog LOG = getLogger();
boolean isLogLevelEnabled = false;
switch (logLevel) {
case trace:
isLogLevelEnabled = LOG.isTraceEnabled();
break;
case debug:
isLogLevelEnabled = LOG.isDebugEnabled();
break;
case info:
isLogLevelEnabled = LOG.isInfoEnabled();
break;
case warn:
isLogLevelEnabled = LOG.isWarnEnabled();
break;
case error:
isLogLevelEnabled = LOG.isErrorEnabled();
break;
}
if (!isLogLevelEnabled) {
transferChunk(session);
return;
}
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
processFlowFile(LOG, logLevel, flowFile, session, context);
session.transfer(flowFile, REL_SUCCESS);
}
protected static class FlowFilePayloadCallback implements InputStreamCallback {
private String contents = "";
private Charset charset;
public FlowFilePayloadCallback(Charset charset) {
this.charset = charset;
}
@Override
public void process(final InputStream in) throws IOException {
contents = IOUtils.toString(in, charset);
}
public String getContents() {
return contents;
}
}
}