blob: 80627e9be1262abcebea56cec1cc81cfbe721055 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import com.google.common.collect.Lists;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@Tags({ "record", "stats", "metrics" })
@CapabilityDescription("A processor that can count the number of items in a record set, as well as provide counts based on " +
"user-defined criteria on subsets of the record set.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = CalculateRecordStats.RECORD_COUNT_ATTR, description = "A count of the records in the record set in the flowfile."),
@WritesAttribute(attribute = "recordStats.<User Defined Property Name>.count", description = "A count of the records that contain a value for the user defined property."),
@WritesAttribute(attribute = "recordStats.<User Defined Property Name>.<value>.count",
description = "Each value discovered for the user defined property will have its own count attribute. " +
"Total number of top N value counts to be added is defined by the limit configuration.")
})
public class CalculateRecordStats extends AbstractProcessor {
static final String RECORD_COUNT_ATTR = "record.count";
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-stats-reader")
.displayName("Record Reader")
.description("A record reader to use for reading the records.")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
.name("record-stats-limit")
.description("Limit the number of individual stats that are returned for each record path to the top N results.")
.required(true)
.defaultValue("10")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("If a flowfile is successfully processed, it goes here.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a flowfile fails to be processed, it goes here.")
.build();
private RecordPathCache cache;
static final Set RELATIONSHIPS;
static final List<PropertyDescriptor> PROPERTIES;
static {
Set _rels = new HashSet();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
RELATIONSHIPS = Collections.unmodifiableSet(_rels);
List<PropertyDescriptor> _temp = new ArrayList<>();
_temp.add(RECORD_READER);
_temp.add(LIMIT);
PROPERTIES = Collections.unmodifiableList(_temp);
}
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
.dynamic(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@OnScheduled
public void onEnabled(ProcessContext context) {
cache = new RecordPathCache(25);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
try {
Map<String, RecordPath> paths = getRecordPaths(context, input);
Map<String, String> stats = getStats(input, paths, context, session);
input = session.putAllAttributes(input, stats);
session.transfer(input, REL_SUCCESS);
} catch (Exception ex) {
getLogger().error("Error processing stats.", ex);
session.transfer(input, REL_FAILURE);
}
}
protected Map<String, RecordPath> getRecordPaths(ProcessContext context, FlowFile flowFile) {
return context.getProperties().keySet()
.stream().filter(p -> p.isDynamic())
.collect(Collectors.toMap(
e -> e.getName(),
e -> {
String val = context.getProperty(e).evaluateAttributeExpressions(flowFile).getValue();
return cache.getCompiled(val);
})
);
}
protected Map<String, String> getStats(FlowFile flowFile, Map<String, RecordPath> paths, ProcessContext context, ProcessSession session) {
try (InputStream is = session.read(flowFile)) {
RecordReaderFactory factory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final Integer limit = context.getProperty(LIMIT).evaluateAttributeExpressions(flowFile).asInteger();
RecordReader reader = factory.createRecordReader(flowFile, is, getLogger());
Map<String, Integer> retVal = new HashMap<>();
Record record;
int recordCount = 0;
List<String> baseKeys = new ArrayList<>();
while ((record = reader.nextRecord()) != null) {
for (Map.Entry<String, RecordPath> entry : paths.entrySet()) {
RecordPathResult result = entry.getValue().evaluate(record);
Optional<FieldValue> value = result.getSelectedFields().findFirst();
if (value.isPresent() && value.get().getValue() != null) {
String approxValue = value.get().getValue().toString();
String baseKey = String.format("recordStats.%s", entry.getKey());
String key = String.format("%s.%s", baseKey, approxValue);
Integer stat = retVal.containsKey(key) ? retVal.get(key) : 0;
Integer baseStat = retVal.getOrDefault(baseKey, 0);
stat++;
baseStat++;
retVal.put(key, stat);
retVal.put(baseKey, baseStat);
if (!baseKeys.contains(baseKey)) {
baseKeys.add(baseKey);
}
}
}
recordCount++;
}
retVal = filterBySize(retVal, limit, baseKeys);
retVal.put(RECORD_COUNT_ATTR, recordCount);
return retVal.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().toString()
));
} catch (Exception e) {
getLogger().error("Could not read flowfile", e);
throw new ProcessException(e);
}
}
protected Map filterBySize(Map<String, Integer> values, Integer limit, List<String> baseKeys) {
Map<String, Integer> toFilter = values.entrySet().stream()
.filter(e -> !baseKeys.contains(e.getKey()))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
Map<String, Integer> retVal = values.entrySet().stream()
.filter((e -> baseKeys.contains(e.getKey())))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
List<Map.Entry<String, Integer>> _flat = new ArrayList<>(toFilter.entrySet());
_flat.sort(Map.Entry.comparingByValue());
_flat = Lists.reverse(_flat);
for (int index = 0; index < _flat.size() && index < limit; index++) {
retVal.put(_flat.get(index).getKey(), _flat.get(index).getValue());
}
return retVal;
}
}