blob: 53a5e5d306793b028d24786fcf82819b8c0b07f7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
import org.apache.nifi.processors.standard.merge.RecordBinManager;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@SideEffectFree
@TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"merge", "record", "content", "correlation", "stream", "event"})
@CapabilityDescription("This Processor merges together multiple record-oriented FlowFiles into a single FlowFile that contains all of the Records of the input FlowFiles. "
+ "This Processor works by creating 'bins' and then adding FlowFiles to these bins until they are full. Once a bin is full, all of the FlowFiles will be combined into "
+ "a single output FlowFile, and that FlowFile will be routed to the 'merged' Relationship. A bin will consist of potentially many 'like FlowFiles'. In order for two "
+ "FlowFiles to be considered 'like FlowFiles', they must have the same Schema (as identified by the Record Reader) and, if the <Correlation Attribute Name> property "
+ "is set, the same value for the specified attribute. See Processor Usage and Additional Details for more information.")
@ReadsAttributes({
@ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. "
+ "All FlowFiles with the same value for this attribute will be bundled together."),
@ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This "
+ "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same "
+ "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected "
+ "in the given bundle."),
})
@WritesAttributes({
@WritesAttribute(attribute = "record.count", description = "The merged FlowFile will have a 'record.count' attribute indicating the number of records "
+ "that were written to the FlowFile."),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type indicated by the Record Writer"),
@WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"),
@WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
+ "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"),
@WritesAttribute(attribute = "merge.uuid", description = "UUID of the merged FlowFile that will be added to the original FlowFiles attributes"),
@WritesAttribute(attribute = "<Attributes from Record Writer>", description = "Any Attribute that the configured Record Writer returns will be added to the FlowFile.")
})
@SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class})
public class MergeRecord extends AbstractSessionFactoryProcessor {
// attributes for defragmentation
public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
public static final String MERGE_UUID_ATTRIBUTE = "merge.uuid";
public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
"Bin-Packing Algorithm",
"Bin-Packing Algorithm",
"Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally "
+ "their attributes (if the <Correlation Attribute> property is set)");
public static final AllowableValue MERGE_STRATEGY_DEFRAGMENT = new AllowableValue(
"Defragment",
"Defragment",
"Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must "
+ "have the attributes <fragment.identifier> and <fragment.count>. All FlowFiles with the same value for \"fragment.identifier\" "
+ "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. The ordering of "
+ "the Records that are output is not guaranteed.");
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for reading incoming data")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing out the records")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder()
.name("merge-strategy")
.displayName("Merge Strategy")
.description("Specifies the algorithm used to merge records. The 'Defragment' algorithm combines fragments that are associated by "
+ "attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily "
+ "chosen FlowFiles")
.required(true)
.allowableValues(MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT)
.defaultValue(MERGE_STRATEGY_BIN_PACK.getValue())
.build();
public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("correlation-attribute-name")
.displayName("Correlation Attribute Name")
.description("If specified, two FlowFiles will be binned together only if they have the same value for "
+ "this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
.defaultValue(null)
.build();
public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
.name("min-bin-size")
.displayName("Minimum Bin Size")
.description("The minimum size of for the bin")
.required(true)
.defaultValue("0 B")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
.name("max-bin-size")
.displayName("Maximum Bin Size")
.description("The maximum size for the bundle. If not specified, there is no maximum. This is a 'soft limit' in that if a FlowFile is added to a bin, "
+ "all records in that FlowFile will be added, so this limit may be exceeded by up to the number of bytes in last input FlowFile.")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MIN_RECORDS = new PropertyDescriptor.Builder()
.name("min-records")
.displayName("Minimum Number of Records")
.description("The minimum number of records to include in a bin")
.required(true)
.defaultValue("1")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_RECORDS = new PropertyDescriptor.Builder()
.name("max-records")
.displayName("Maximum Number of Records")
.description("The maximum number of Records to include in a bin. This is a 'soft limit' in that if a FlowFIle is added to a bin, all records in that FlowFile will be added, "
+ "so this limit may be exceeded by up to the number of records in the last input FlowFile.")
.required(false)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder()
.name("max.bin.count")
.displayName("Maximum Number of Bins")
.description("Specifies the maximum number of bins that can be held in memory at any one time. "
+ "This number should not be smaller than the maximum number of conurrent threads for this Processor, "
+ "or the bins that are created will often consist only of a single incoming FlowFile.")
.defaultValue("10")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder()
.name("max-bin-age")
.displayName("Max Bin Age")
.description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> "
+ "where <duration> is a positive integer and time unit is one of seconds, minutes, hours")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final Relationship REL_MERGED = new Relationship.Builder()
.name("merged")
.description("The FlowFile containing the merged records")
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The FlowFiles that were used to create the bundle")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure")
.build();
private final AtomicReference<RecordBinManager> binManager = new AtomicReference<>();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(RECORD_READER);
properties.add(RECORD_WRITER);
properties.add(MERGE_STRATEGY);
properties.add(CORRELATION_ATTRIBUTE_NAME);
properties.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY);
properties.add(MIN_RECORDS);
properties.add(MAX_RECORDS);
properties.add(MIN_SIZE);
properties.add(MAX_SIZE);
properties.add(MAX_BIN_AGE);
properties.add(MAX_BIN_COUNT);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
relationships.add(REL_FAILURE);
relationships.add(REL_MERGED);
return relationships;
}
@OnStopped
public final void resetState() {
final RecordBinManager manager = binManager.get();
if (manager != null) {
manager.purge();
}
binManager.set(null);
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final Integer minRecords = validationContext.getProperty(MIN_RECORDS).evaluateAttributeExpressions().asInteger();
final Integer maxRecords = validationContext.getProperty(MAX_RECORDS).evaluateAttributeExpressions().asInteger();
if (minRecords != null && maxRecords != null && maxRecords < minRecords) {
results.add(new ValidationResult.Builder()
.subject("Max Records")
.input(String.valueOf(maxRecords))
.valid(false)
.explanation("<Maximum Number of Records> property cannot be smaller than <Minimum Number of Records> property")
.build());
}
if (minRecords != null && minRecords <= 0) {
results.add(new ValidationResult.Builder()
.subject("Min Records")
.input(String.valueOf(minRecords))
.valid(false)
.explanation("<Minimum Number of Records> property cannot be negative or zero")
.build());
}
if (maxRecords != null && maxRecords <= 0) {
results.add(new ValidationResult.Builder()
.subject("Max Records")
.input(String.valueOf(maxRecords))
.valid(false)
.explanation("<Maximum Number of Records> property cannot be negative or zero")
.build());
}
final Double minSize = validationContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B);
final Double maxSize = validationContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
if (minSize != null && maxSize != null && maxSize < minSize) {
results.add(new ValidationResult.Builder()
.subject("Max Size")
.input(validationContext.getProperty(MAX_SIZE).getValue())
.valid(false)
.explanation("<Maximum Bin Size> property cannot be smaller than <Minimum Bin Size> property")
.build());
}
return results;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
RecordBinManager manager = binManager.get();
while (manager == null) {
manager = new RecordBinManager(context, sessionFactory, getLogger());
manager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
final boolean updated = binManager.compareAndSet(null, manager);
if (!updated) {
manager = binManager.get();
}
}
final ProcessSession session = sessionFactory.createSession();
final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
if (getLogger().isDebugEnabled()) {
final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
}
final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
final boolean block;
if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
block = true;
} else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
block = true;
} else {
block = false;
}
try {
for (final FlowFile flowFile : flowFiles) {
try {
binFlowFile(context, flowFile, session, manager, block);
} catch (final Exception e) {
getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
}
} finally {
session.commitAsync();
}
// If there is no more data queued up, or strategy is defragment, complete any bin that meets our minimum threshold
// Otherwise, run one more cycle to process queued FlowFiles to add more fragment into available bins.
int completedBins = 0;
if (flowFiles.isEmpty() || MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
try {
completedBins += manager.completeFullEnoughBins();
} catch (final Exception e) {
getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
}
}
// Complete any bins that have reached their expiration date
try {
completedBins += manager.completeExpiredBins();
} catch (final Exception e) {
getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
}
if (completedBins == 0 && flowFiles.isEmpty()) {
getLogger().debug("No FlowFiles to bin; will yield");
context.yield();
}
}
private void binFlowFile(final ProcessContext context, final FlowFile flowFile, final ProcessSession session, final RecordBinManager binManager, final boolean block) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
final RecordSchema schema = reader.getSchema();
final String groupId = getGroupId(context, flowFile, schema, session);
getLogger().debug("Got Group ID {} for {}", new Object[] {groupId, flowFile});
binManager.add(groupId, flowFile, reader, session, block);
} catch (MalformedRecordException | IOException | SchemaNotFoundException e) {
throw new ProcessException(e);
}
}
protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final RecordSchema schema, final ProcessSession session) {
final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
return flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
}
final Optional<String> optionalText = schema.getSchemaText();
final String schemaText = optionalText.orElseGet(() -> AvroTypeUtil.extractAvroSchema(schema).toString());
final String groupId;
final String correlationshipAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
if (correlationshipAttributeName != null) {
final String correlationAttr = flowFile.getAttribute(correlationshipAttributeName);
groupId = correlationAttr == null ? schemaText : schemaText + correlationAttr;
} else {
groupId = schemaText;
}
return groupId;
}
int getBinCount() {
return binManager.get().getBinCount();
}
}